This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new adc719c6744 Remove old code for online upgrade TsFileV2 to TsFileV3
(#10363)
adc719c6744 is described below
commit adc719c6744382340444431c78f7d7913c578205
Author: Haonan <[email protected]>
AuthorDate: Thu Jun 29 11:15:33 2023 +0800
Remove old code for online upgrade TsFileV2 to TsFileV3 (#10363)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 -
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 61 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 2 -
.../org/apache/iotdb/db/service/UpgradeSevice.java | 107 ----
.../iotdb/db/storageengine/StorageEngine.java | 4 -
.../db/storageengine/dataregion/DataRegion.java | 227 +-------
.../dataregion/read/control/FileReaderManager.java | 8 +-
.../read/reader/chunk/DiskChunkLoader.java | 1 -
.../chunk/metadata/DiskChunkMetadataLoader.java | 7 -
.../dataregion/tsfile/TsFileResource.java | 31 --
.../storageengine/upgrade/UpgradeCheckStatus.java | 40 --
.../iotdb/db/storageengine/upgrade/UpgradeLog.java | 93 ----
.../db/storageengine/upgrade/UpgradeTask.java | 169 ------
.../iotdb/db/tools/TsFileSplitByPartitionTool.java | 18 -
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 291 ----------
.../apache/iotdb/db/utils/ModificationUtils.java | 9 -
.../org/apache/iotdb/db/utils/UpgradeUtils.java | 166 ------
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 -
.../tsfile/file/metadata/AlignedChunkMetadata.java | 5 -
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 11 -
.../iotdb/tsfile/file/metadata/IChunkMetadata.java | 2 -
.../org/apache/iotdb/tsfile/read/common/Chunk.java | 9 -
.../tsfile/read/reader/chunk/ChunkReader.java | 58 +-
.../tsfile/v2/file/footer/ChunkGroupFooterV2.java | 87 ---
.../iotdb/tsfile/v2/file/header/ChunkHeaderV2.java | 113 ----
.../iotdb/tsfile/v2/file/header/PageHeaderV2.java | 50 --
.../tsfile/v2/file/metadata/ChunkMetadataV2.java | 52 --
.../v2/file/metadata/MetadataIndexEntryV2.java | 35 --
.../v2/file/metadata/MetadataIndexNodeV2.java | 45 --
.../v2/file/metadata/TimeseriesMetadataV2.java | 43 --
.../tsfile/v2/file/metadata/TsFileMetadataV2.java | 76 ---
.../v2/file/metadata/statistics/StatisticsV2.java | 104 ----
.../tsfile/v2/read/TsFileSequenceReaderForV2.java | 620 ---------------------
.../tsfile/v2/read/reader/page/PageReaderV2.java | 94 ----
34 files changed, 11 insertions(+), 2639 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9e8934bc01e..959db5ab547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -647,9 +647,6 @@ public class IoTDBConfig {
/** TEXT encoding when creating schema automatically is enabled */
private TSEncoding defaultTextEncoding = TSEncoding.PLAIN;
- /** How many threads will be set up to perform upgrade tasks. */
- private int upgradeThreadCount = 1;
-
/** How many threads will be set up to perform settle tasks. */
private int settleThreadNum = 1;
@@ -2334,18 +2331,10 @@ public class IoTDBConfig {
this.hdfsPort = hdfsPort;
}
- public int getUpgradeThreadCount() {
- return upgradeThreadCount;
- }
-
public int getSettleThreadNum() {
return settleThreadNum;
}
- void setUpgradeThreadCount(int upgradeThreadCount) {
- this.upgradeThreadCount = upgradeThreadCount;
- }
-
String getDfsNameServices() {
return dfsNameServices;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index a0b36d30e50..32848b4f0cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -42,7 +42,6 @@ import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.Properties;
import java.util.function.Supplier;
@@ -262,71 +261,15 @@ public class IoTDBStartCheck {
+ " Please upgrade to v0.10 first");
System.exit(-1);
}
- // check whether upgrading from [v0.10, v.13]
String versionString = properties.getProperty(IOTDB_VERSION_STRING);
- if (versionString.startsWith("0.10") ||
versionString.startsWith("0.11")) {
- logger.error("IoTDB version is too old, please upgrade to 0.12
firstly.");
+ if (versionString.startsWith("0.")) {
+ logger.error("IoTDB version is too old");
System.exit(-1);
- } else if (versionString.startsWith("0.12") ||
versionString.startsWith("0.13")) {
- checkWALNotExists();
- upgradePropertiesFile();
}
checkImmutableSystemProperties();
}
}
- private void checkWALNotExists() {
- for (String walDir : commonConfig.getWalDirs()) {
- if (SystemFileFactory.INSTANCE.getFile(walDir).isDirectory()) {
- File[] sgWALs = SystemFileFactory.INSTANCE.getFile(walDir).listFiles();
- if (sgWALs != null) {
- for (File sgWAL : sgWALs) {
- // make sure wal directory of each sg is empty
- if (sgWAL.isDirectory() &&
Objects.requireNonNull(sgWAL.list()).length != 0) {
- logger.error(
- "WAL detected, please stop insertion and run 'SET SYSTEM TO
READONLY', then run 'flush' on IoTDB {} before upgrading to {}.",
- properties.getProperty(IOTDB_VERSION_STRING),
- IoTDBConstant.VERSION);
- System.exit(-1);
- }
- }
- }
- }
- }
- }
-
- /** upgrade 0.12 or 0.13 properties to 0.14 properties */
- private void upgradePropertiesFile() throws IOException {
- // create an empty tmpPropertiesFile
- if (tmpPropertiesFile.createNewFile()) {
- logger.info("Create system.properties.tmp {}.", tmpPropertiesFile);
- } else {
- logger.error("Create system.properties.tmp {} failed.",
tmpPropertiesFile);
- System.exit(-1);
- }
-
- try (FileOutputStream tmpFOS = new
FileOutputStream(tmpPropertiesFile.toString())) {
- systemProperties.forEach(
- (k, v) -> {
- if (!properties.containsKey(k)) {
- properties.setProperty(k, v.get());
- }
- });
- properties.setProperty(IOTDB_VERSION_STRING, IoTDBConstant.VERSION);
- // rename virtual_storage_group_num to data_region_num
- properties.setProperty(DATA_REGION_NUM,
properties.getProperty(VIRTUAL_STORAGE_GROUP_NUM));
- properties.remove(VIRTUAL_STORAGE_GROUP_NUM);
- properties.store(tmpFOS, SYSTEM_PROPERTIES_STRING);
-
- // upgrade finished, delete old system.properties file
- if (propertiesFile.exists()) {
- Files.delete(propertiesFile.toPath());
- }
- }
- // rename system.properties.tmp to system.properties
- FileUtils.moveFile(tmpPropertiesFile, propertiesFile);
- }
-
/** repair broken properties */
private void upgradePropertiesFileFromBrokenFile() throws IOException {
// create an empty tmpPropertiesFile
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 68c5208f430..e0a8d40d2ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -533,8 +533,6 @@ public class DataNode implements DataNodeMBean {
// must init after SchemaEngine and StorageEngine prepared well
DataNodeRegionManager.getInstance().init();
- registerManager.register(UpgradeSevice.getINSTANCE());
-
// start region migrate service
registerManager.register(RegionMigrateService.getInstance());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
deleted file mode 100644
index c4fe7631d61..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/UpgradeSevice.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.service;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeLog;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeTask;
-import org.apache.iotdb.db.utils.UpgradeUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class UpgradeSevice implements IService {
-
- private static final Logger logger =
LoggerFactory.getLogger(UpgradeSevice.class);
-
- private ExecutorService upgradeThreadPool;
- private static final AtomicInteger cntUpgradeFileNum = new AtomicInteger();
-
- private UpgradeSevice() {}
-
- public static UpgradeSevice getINSTANCE() {
- return InstanceHolder.INSTANCE;
- }
-
- public static class InstanceHolder {
- private static final UpgradeSevice INSTANCE = new UpgradeSevice();
-
- private InstanceHolder() {}
- }
-
- @Override
- public void start() {
- int updateThreadNum =
IoTDBDescriptor.getInstance().getConfig().getUpgradeThreadCount();
- if (updateThreadNum <= 0) {
- updateThreadNum = 1;
- }
- upgradeThreadPool =
IoTDBThreadPoolFactory.newFixedThreadPool(updateThreadNum, "UpgradeThread");
- UpgradeLog.createUpgradeLog();
- countUpgradeFiles();
- if (cntUpgradeFileNum.get() == 0) {
- stop();
- return;
- }
- upgradeAll();
- }
-
- @Override
- public void stop() {
- UpgradeLog.closeLogWriter();
- UpgradeUtils.clearUpgradeRecoverMap();
- if (upgradeThreadPool != null) {
- upgradeThreadPool.shutdownNow();
- logger.info("Waiting for upgrade task pool to shut down");
- upgradeThreadPool = null;
- logger.info("Upgrade service stopped");
- }
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.UPGRADE_SERVICE;
- }
-
- public static AtomicInteger getTotalUpgradeFileNum() {
- return cntUpgradeFileNum;
- }
-
- public void submitUpgradeTask(UpgradeTask upgradeTask) {
- upgradeThreadPool.submit(upgradeTask);
- }
-
- private static void countUpgradeFiles() {
- //
cntUpgradeFileNum.addAndGet(StorageEngine.getInstance().countUpgradeFiles());
- // logger.info("finish counting upgrading files, total num:{}",
cntUpgradeFileNum);
- }
-
- private static void upgradeAll() {
- // try {
- // StorageEngine.getInstance().upgradeAll();
- // } catch (StorageEngineException e) {
- // logger.error("Cannot perform a global upgrade because", e);
- // }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 13636e9efee..29cc858d5d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -61,7 +61,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.ThreadUtils;
-import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
@@ -309,9 +308,6 @@ public class StorageEngine implements IService {
throw new StorageEngineFailureException(e);
}
- // recover upgrade process
- UpgradeUtils.recoverUpgrade();
-
recover();
ttlCheckThread =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a390bc45644..8f9fa8cde85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -82,7 +81,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.SimpleFileVersionController;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
@@ -98,12 +96,9 @@ import
org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeCheckStatus;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeLog;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.DateTimeUtils;
-import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -132,7 +127,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -141,7 +135,6 @@ import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
@@ -209,20 +202,14 @@ public class DataRegion implements IDataRegionForQuery {
/** time partition id in the database -> tsFileProcessor for this time
partition. */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- // upgrading sequence TsFile resource list
- private final List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
/** sequence tsfile processors which are closing. */
private final CopyOnReadLinkedList<TsFileProcessor>
closingSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
- /** upgrading unsequence TsFile resource list. */
- private final List<TsFileResource> upgradeUnseqFileList = new LinkedList<>();
/** unsequence tsfile processors which are closing. */
private final CopyOnReadLinkedList<TsFileProcessor>
closingUnSequenceTsFileProcessor =
new CopyOnReadLinkedList<>();
- private final AtomicInteger upgradeFileCount = new AtomicInteger();
-
private final AtomicBoolean isSettling = new AtomicBoolean();
/** data region id. */
@@ -434,20 +421,10 @@ public class DataRegion implements IDataRegionForQuery {
try {
// collect candidate TsFiles from sequential and unsequential data
directory
- Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
+ List<TsFileResource> tmpSeqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
- List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
- List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
- upgradeSeqFileList.addAll(oldSeqTsFiles);
- Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
+ List<TsFileResource> tmpUnseqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
- List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
- List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
- upgradeUnseqFileList.addAll(oldUnseqTsFiles);
-
- if (upgradeSeqFileList.size() + upgradeUnseqFileList.size() != 0) {
- upgradeFileCount.set(upgradeSeqFileList.size() +
upgradeUnseqFileList.size());
- }
// split by partition so that we can find the last file of each
partition and decide to
// close it or not
@@ -562,15 +539,6 @@ public class DataRegion implements IDataRegionForQuery {
long partitionNum = resource.getTimePartition();
updatePartitionFileVersion(partitionNum, resource.getVersion());
}
- for (TsFileResource resource : upgradeSeqFileList) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- for (TsFileResource resource : upgradeUnseqFileList) {
- long partitionNum = resource.getTimePartition();
- updatePartitionFileVersion(partitionNum, resource.getVersion());
- }
- updateLatestFlushedTime();
} catch (IOException e) {
throw new DataRegionException(e);
}
@@ -631,61 +599,16 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /**
- * use old seq file to update latestTimeForEachDevice,
globalLatestFlushedTimeForEachDevice,
- * partitionLatestFlushedTimeForEachDevice and
timePartitionIdVersionControllerMap.
- */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- private void updateLatestFlushedTime() throws IOException {
-
- VersionController versionController =
- new SimpleFileVersionController(storageGroupSysDir.getPath());
- long currentVersion = versionController.currVersion();
- for (TsFileResource resource : upgradeSeqFileList) {
- for (String deviceId : resource.getDevices()) {
- long endTime = resource.getEndTime(deviceId);
- long endTimePartitionId = StorageEngine.getTimePartition(endTime);
- lastFlushTimeMap.setOneDeviceGlobalFlushedTime(deviceId, endTime);
-
- // set all the covered partition's LatestFlushedTime
- long partitionId =
StorageEngine.getTimePartition(resource.getStartTime(deviceId));
- while (partitionId <= endTimePartitionId) {
- lastFlushTimeMap.setOneDeviceFlushedTime(partitionId, deviceId,
endTime);
- if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) {
- File directory =
- SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
String.valueOf(partitionId));
- if (!directory.exists()) {
- directory.mkdirs();
- }
- File versionFile =
- SystemFileFactory.INSTANCE.getFile(
- directory, SimpleFileVersionController.FILE_PREFIX +
currentVersion);
- if (!versionFile.createNewFile()) {
- logger.warn("Version file {} has already been created ",
versionFile);
- }
- timePartitionIdVersionControllerMap.put(
- partitionId,
- new SimpleFileVersionController(storageGroupSysDir.getPath(),
partitionId));
- }
- partitionId++;
- }
- }
- }
- }
-
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- private Pair<List<TsFileResource>, List<TsFileResource>>
getAllFiles(List<String> folders)
+ private List<TsFileResource> getAllFiles(List<String> folders)
throws IOException, DataRegionException {
// "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files
in one time partition
Map<String, File> tsFilePartitionPath2File = new HashMap<>();
- List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
File fileFolder = fsFactory.getFile(baseDir + File.separator +
databaseName, dataRegionId);
if (!fileFolder.exists()) {
continue;
}
-
- // old version
// some TsFileResource may be being persisted when the system crashed,
try recovering such
// resources
continueFailedRenames(fileFolder, TEMP_SUFFIX);
@@ -695,12 +618,10 @@ public class DataRegion implements IDataRegionForQuery {
for (File partitionFolder : subFiles) {
if (!partitionFolder.isDirectory()) {
logger.warn("{} is not a directory.",
partitionFolder.getAbsolutePath());
- } else if
(!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
+ } else {
// some TsFileResource may be being persisted when the system
crashed, try recovering
- // such
- // resources
+ // such resources
continueFailedRenames(partitionFolder, TEMP_SUFFIX);
-
String partitionName = partitionFolder.getName();
File[] tsFilesInThisFolder =
fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
TSFILE_SUFFIX);
@@ -708,12 +629,6 @@ public class DataRegion implements IDataRegionForQuery {
String tsFilePartitionPath = partitionName + File.separator +
f.getName();
tsFilePartitionPath2File.put(tsFilePartitionPath, f);
}
-
- } else {
- // collect old TsFiles for upgrading
- Collections.addAll(
- upgradeFiles,
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
TSFILE_SUFFIX));
}
}
}
@@ -728,17 +643,7 @@ public class DataRegion implements IDataRegionForQuery {
checkTsFileTime(f, currentTime);
ret.add(new TsFileResource(f));
}
- upgradeFiles.sort(this::compareFileName);
- List<TsFileResource> upgradeRet = new ArrayList<>();
- for (File f : upgradeFiles) {
- checkTsFileTime(f, currentTime);
- TsFileResource fileResource = new TsFileResource(f);
- fileResource.setStatus(TsFileResourceStatus.NORMAL);
- // make sure the flush command is called before IoTDB is down.
- fileResource.deserializeFromOldFile();
- upgradeRet.add(fileResource);
- }
- return new Pair<>(ret, upgradeRet);
+ return ret;
}
private void continueFailedRenames(File fileFolder, String suffix) throws
IOException {
@@ -1762,7 +1667,6 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> seqResources =
getFileResourceListForQuery(
tsFileManager.getTsFileList(true),
- upgradeSeqFileList,
pathList,
singleDeviceId,
context,
@@ -1771,7 +1675,6 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> unseqResources =
getFileResourceListForQuery(
tsFileManager.getTsFileList(false),
- upgradeUnseqFileList,
pathList,
singleDeviceId,
context,
@@ -1823,7 +1726,6 @@ public class DataRegion implements IDataRegionForQuery {
*/
private List<TsFileResource> getFileResourceListForQuery(
Collection<TsFileResource> tsFileResources,
- List<TsFileResource> upgradeTsFileResources,
List<PartialPath> pathList,
String singleDeviceId,
QueryContext context,
@@ -1846,20 +1748,6 @@ public class DataRegion implements IDataRegionForQuery {
dataTTL != Long.MAX_VALUE ? DateTimeUtils.currentTime() - dataTTL :
Long.MIN_VALUE;
context.setQueryTimeLowerBound(timeLowerBound);
- // for upgrade files and old files must be closed
- for (TsFileResource tsFileResource : upgradeTsFileResources) {
- if (!tsFileResource.isSatisfied(
- singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
- continue;
- }
- closeQueryLock.readLock().lock();
- try {
- tsfileResourcesForQuery.add(tsFileResource);
- } finally {
- closeQueryLock.readLock().unlock();
- }
- }
-
for (TsFileResource tsFileResource : tsFileResources) {
if (!tsFileResource.isSatisfied(
singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
@@ -1921,11 +1809,6 @@ public class DataRegion implements IDataRegionForQuery {
long searchIndex,
TimePartitionFilter timePartitionFilter)
throws IOException {
- // If there are still some old version tsfiles, the delete won't succeeded.
- if (upgradeFileCount.get() != 0) {
- throw new IOException(
- "Delete failed. " + "Please do not delete until the old files
upgraded.");
- }
if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) {
throw new IOException(
"Delete failed. " + "Please do not delete until the old files
settled.");
@@ -2157,13 +2040,6 @@ public class DataRegion implements IDataRegionForQuery {
workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) !=
null);
}
- /** used for upgrading */
- public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- long partitionId, String deviceId, long time) {
- lastFlushTimeMap.updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- partitionId, deviceId, time);
- }
-
/** put the memtable back to the MemTablePool and make the metadata in
writer visible */
// TODO please consider concurrency with read and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
@@ -2220,53 +2096,6 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
- /**
- * count all TsFiles in the database which need to be upgraded
- *
- * @return total num of the tsfiles which need to be upgraded in the database
- */
- public int countUpgradeFiles() {
- return upgradeFileCount.get();
- }
-
- /** upgrade all files belongs to this database */
- public void upgrade() {
- for (TsFileResource seqTsFileResource : upgradeSeqFileList) {
-
seqTsFileResource.setUpgradeTsFileResourceCallBack(this::upgradeTsFileResourceCallBack);
- seqTsFileResource.doUpgrade();
- }
- for (TsFileResource unseqTsFileResource : upgradeUnseqFileList) {
-
unseqTsFileResource.setUpgradeTsFileResourceCallBack(this::upgradeTsFileResourceCallBack);
- unseqTsFileResource.doUpgrade();
- }
- }
-
- private void upgradeTsFileResourceCallBack(TsFileResource tsFileResource) {
- List<TsFileResource> upgradedResources =
tsFileResource.getUpgradedResources();
- for (TsFileResource resource : upgradedResources) {
- long partitionId = resource.getTimePartition();
- resource
- .getDevices()
- .forEach(
- device ->
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
- partitionId, device, resource.getEndTime(device)));
- }
- upgradeFileCount.getAndAdd(-1);
- // load all upgraded resources in this sg to tsFileResourceManager
- if (upgradeFileCount.get() == 0) {
- writeLock("upgradeTsFileResourceCallBack");
- try {
- loadUpgradedResources(upgradeSeqFileList, true);
- loadUpgradedResources(upgradeUnseqFileList, false);
- } finally {
- writeUnlock();
- }
- // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
- lastFlushTimeMap.applyNewlyFlushedTimeToFlushedTime();
- }
- }
-
/**
* After finishing settling tsfile, we need to do 2 things : (1) move the
new tsfile to the
* correct folder, including deleting its old mods file (2) update the
relevant data of this old
@@ -2309,44 +2138,6 @@ public class DataRegion implements IDataRegionForQuery {
BloomFilterCache.getInstance().clear();
}
- private void loadUpgradedResources(List<TsFileResource> resources, boolean
isseq) {
- if (resources.isEmpty()) {
- return;
- }
- for (TsFileResource resource : resources) {
- resource.writeLock();
- try {
- UpgradeUtils.moveUpgradedFiles(resource);
- tsFileManager.addAll(resource.getUpgradedResources(), isseq);
- // delete old TsFile and resource
- resource.delete();
- Files.deleteIfExists(
- fsFactory
- .getFile(resource.getTsFile().toPath() +
ModificationFile.FILE_SUFFIX)
- .toPath());
- UpgradeLog.writeUpgradeLogFile(
- resource.getTsFile().getAbsolutePath() + "," +
UpgradeCheckStatus.UPGRADE_SUCCESS);
- } catch (IOException e) {
- logger.error("Unable to load {}, caused by ", resource, e);
- } finally {
- resource.writeUnlock();
- }
- }
- // delete upgrade folder when it is empty
- if (resources.get(0).getTsFile().getParentFile().isDirectory()
- && resources.get(0).getTsFile().getParentFile().listFiles().length ==
0) {
- try {
- Files.delete(resources.get(0).getTsFile().getParentFile().toPath());
- } catch (IOException e) {
- logger.error(
- "Delete upgrade folder {} failed, caused by ",
- resources.get(0).getTsFile().getParentFile(),
- e);
- }
- }
- resources.clear();
- }
-
/** merge file under this database processor */
public int compact() {
writeLock("merge");
@@ -3085,12 +2876,6 @@ public class DataRegion implements IDataRegionForQuery {
void call(TsFileProcessor caller, Map<String, Long> updateMap, long
systemFlushTime);
}
- @FunctionalInterface
- public interface UpgradeTsFileResourceCallBack {
-
- void call(TsFileResource caller);
- }
-
@FunctionalInterface
public interface CompactionRecoverCallBack {
void call();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManager.java
index 12a122ea24a..fde957a1d5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManager.java
@@ -24,7 +24,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.UnClosedTsFileReader;
-import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,12 +130,7 @@ public class FileReaderManager {
tsFileReader = new TsFileSequenceReader(filePath);
if (tsFileReader.readVersionNumber() != TSFileConfig.VERSION_NUMBER) {
tsFileReader.close();
- tsFileReader = new TsFileSequenceReaderForV2(filePath);
- if (!((TsFileSequenceReaderForV2) tsFileReader)
- .readVersionNumberV2()
- .equals(TSFileConfig.VERSION_NUMBER_V2)) {
- throw new IOException("The version of this TsFile is not corrent.
");
- }
+ throw new IOException("The version of this TsFile is not correct.");
}
}
readerMap.put(filePath, tsFileReader);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
index 4455f3c0fcf..a58543587d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskChunkLoader.java
@@ -62,7 +62,6 @@ public class DiskChunkLoader implements IChunkLoader {
long t1 = System.nanoTime();
try {
Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata)
chunkMetaData, debug);
- chunk.setFromOldFile(chunkMetaData.isFromOldTsFile());
long t2 = System.nanoTime();
IChunkReader chunkReader = new ChunkReader(chunk, timeFilter);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
index b8c80a6578f..1a431895d1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -114,13 +114,6 @@ public class DiskChunkMetadataLoader implements
IChunkMetadataLoader {
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
CHUNK_METADATA_FILTER_NONALIGNED_DISK, System.nanoTime() - t3);
- // For chunkMetadata from old TsFile, do not set version
- for (IChunkMetadata metadata : chunkMetadataList) {
- if (!metadata.isFromOldTsFile()) {
- metadata.setVersion(resource.getVersion());
- }
- }
-
if (context.isDebug()) {
DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: ");
chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9f9ff0ae2ba..adbf8a59b39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
-import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -38,7 +37,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeTask;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
@@ -117,14 +115,6 @@ public class TsFileResource {
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
- /** generated upgraded TsFile ResourceList used for upgrading v0.11.x/v2 ->
0.12/v3 */
- private List<TsFileResource> upgradedResources;
-
- /**
- * load upgraded TsFile Resources to database processor used for upgrading
v0.11.x/v2 -> 0.12/v3
- */
- private DataRegion.UpgradeTsFileResourceCallBack
upgradeTsFileResourceCallBack;
-
private DataRegion.SettleTsFileCallBack settleTsFileCallBack;
/** Maximum index of plans executed within this TsFile. */
@@ -591,10 +581,6 @@ public class TsFileResource {
return tsFileLock.tryReadLock();
}
- public void doUpgrade() {
- UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
- }
-
public void removeModFile() throws IOException {
getModFile().remove();
modFile = null;
@@ -860,23 +846,6 @@ public class TsFileResource {
this.pathToTimeSeriesMetadataMap.put(path, timeSeriesMetadata);
}
- public void setUpgradedResources(List<TsFileResource> upgradedResources) {
- this.upgradedResources = upgradedResources;
- }
-
- public List<TsFileResource> getUpgradedResources() {
- return upgradedResources;
- }
-
- public void setUpgradeTsFileResourceCallBack(
- DataRegion.UpgradeTsFileResourceCallBack upgradeTsFileResourceCallBack) {
- this.upgradeTsFileResourceCallBack = upgradeTsFileResourceCallBack;
- }
-
- public DataRegion.UpgradeTsFileResourceCallBack
getUpgradeTsFileResourceCallBack() {
- return upgradeTsFileResourceCallBack;
- }
-
public DataRegion.SettleTsFileCallBack getSettleTsFileCallBack() {
return settleTsFileCallBack;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeCheckStatus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeCheckStatus.java
deleted file mode 100644
index 486d953bff4..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeCheckStatus.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.storageengine.upgrade;
-
-public enum UpgradeCheckStatus {
- BEGIN_UPGRADE_FILE(1),
- AFTER_UPGRADE_FILE(2),
- UPGRADE_SUCCESS(3);
-
- private final int checkStatusCode;
-
- UpgradeCheckStatus(int checkStatusCode) {
- this.checkStatusCode = checkStatusCode;
- }
-
- public int getCheckStatusCode() {
- return checkStatusCode;
- }
-
- @Override
- public String toString() {
- return String.valueOf(checkStatusCode);
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeLog.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeLog.java
deleted file mode 100644
index 0b15d80701e..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeLog.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.storageengine.upgrade;
-
-import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.utils.UpgradeUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-public class UpgradeLog {
-
- private static final Logger logger =
LoggerFactory.getLogger(UpgradeLog.class);
-
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static final String UPGRADE_DIR = "upgrade";
- private static final String UPGRADE_LOG_NAME = "upgrade.txt";
- private static BufferedWriter upgradeLogWriter;
- private static File upgradeLogPath =
- SystemFileFactory.INSTANCE.getFile(
- SystemFileFactory.INSTANCE.getFile(config.getSystemDir(),
UPGRADE_DIR), UPGRADE_LOG_NAME);
-
- private UpgradeLog() {}
-
- public static boolean createUpgradeLog() {
- try {
- if (!upgradeLogPath.getParentFile().exists()) {
- upgradeLogPath.getParentFile().mkdirs();
- }
- if (!upgradeLogPath.createNewFile()) {
- logger.warn("create upgrade log file fail");
- }
-
- upgradeLogWriter = new BufferedWriter(new
FileWriter(getUpgradeLogPath(), true));
- return true;
- } catch (IOException e) {
- logger.error("meet error when create upgrade log, file path:{}",
upgradeLogPath, e);
- return false;
- }
- }
-
- public static String getUpgradeLogPath() {
- return upgradeLogPath.getAbsolutePath();
- }
-
- public static boolean writeUpgradeLogFile(String content) {
- UpgradeUtils.getUpgradeLogLock().writeLock().lock();
- try {
- upgradeLogWriter.write(content);
- upgradeLogWriter.newLine();
- upgradeLogWriter.flush();
- return true;
- } catch (IOException e) {
- logger.error("write upgrade log file failed, the log file:{}",
getUpgradeLogPath(), e);
- return false;
- } finally {
- UpgradeUtils.getUpgradeLogLock().writeLock().unlock();
- }
- }
-
- public static void closeLogWriter() {
- try {
- if (upgradeLogWriter != null) {
- upgradeLogWriter.close();
- }
- } catch (IOException e) {
- logger.error("close upgrade log file failed, the log file:{}",
getUpgradeLogPath(), e);
- }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeTask.java
deleted file mode 100644
index bca797fcfa6..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/upgrade/UpgradeTask.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.storageengine.upgrade;
-
-import org.apache.iotdb.commons.concurrent.WrappedRunnable;
-import org.apache.iotdb.db.service.UpgradeSevice;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
-import org.apache.iotdb.db.tools.upgrade.TsFileOnlineUpgradeTool;
-import org.apache.iotdb.db.utils.UpgradeUtils;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class UpgradeTask extends WrappedRunnable {
-
- private TsFileResource upgradeResource;
- private static final Logger logger =
LoggerFactory.getLogger(UpgradeTask.class);
- private static final String COMMA_SEPERATOR = ",";
-
- private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
-
- public UpgradeTask(TsFileResource upgradeResource) {
- this.upgradeResource = upgradeResource;
- }
-
- @Override
- public void runMayThrow() {
- try {
- String oldTsfilePath = upgradeResource.getTsFile().getAbsolutePath();
- List<TsFileResource> upgradedResources;
- if
(!UpgradeUtils.isUpgradedFileGenerated(upgradeResource.getTsFile().getName())) {
- logger.info("generate upgraded file for {}",
upgradeResource.getTsFile());
- upgradedResources = generateUpgradedFiles();
- } else {
- logger.info("find upgraded file for {}", upgradeResource.getTsFile());
- upgradedResources = findUpgradedFiles();
- }
- upgradeResource.setUpgradedResources(upgradedResources);
- upgradeResource.getUpgradeTsFileResourceCallBack().call(upgradeResource);
- UpgradeSevice.getTotalUpgradeFileNum().getAndAdd(-1);
- logger.info(
- "Upgrade completes, file path:{} , the remaining upgraded file num:
{}",
- oldTsfilePath,
- UpgradeSevice.getTotalUpgradeFileNum().get());
- if (UpgradeSevice.getTotalUpgradeFileNum().get() == 0) {
- logger.info("Start delete empty tmp folders");
-
clearTmpFolders(TierManager.getInstance().getAllLocalSequenceFileFolders());
-
clearTmpFolders(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
- UpgradeSevice.getINSTANCE().stop();
- logger.info("All files upgraded successfully! ");
- }
- } catch (Exception e) {
- logger.error(
- "meet error when upgrade file:{}",
upgradeResource.getTsFile().getAbsolutePath(), e);
- }
- }
-
- private List<TsFileResource> generateUpgradedFiles() throws IOException,
WriteProcessException {
- upgradeResource.readLock();
- String oldTsfilePath = upgradeResource.getTsFile().getAbsolutePath();
- List<TsFileResource> upgradedResources = new ArrayList<>();
- UpgradeLog.writeUpgradeLogFile(
- oldTsfilePath + COMMA_SEPERATOR +
UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
- try {
- TsFileOnlineUpgradeTool.upgradeOneTsFile(upgradeResource,
upgradedResources);
- UpgradeLog.writeUpgradeLogFile(
- oldTsfilePath + COMMA_SEPERATOR +
UpgradeCheckStatus.AFTER_UPGRADE_FILE);
- } finally {
- upgradeResource.readUnlock();
- }
- return upgradedResources;
- }
-
- private List<TsFileResource> findUpgradedFiles() throws IOException {
- upgradeResource.readLock();
- List<TsFileResource> upgradedResources = new ArrayList<>();
- String oldTsfilePath = upgradeResource.getTsFile().getAbsolutePath();
- UpgradeLog.writeUpgradeLogFile(
- oldTsfilePath + COMMA_SEPERATOR +
UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
- try {
- File upgradeFolder = upgradeResource.getTsFile().getParentFile();
- for (File tempPartitionDir : upgradeFolder.listFiles()) {
- if (tempPartitionDir.isDirectory()
- && fsFactory
- .getFile(
- tempPartitionDir,
- upgradeResource.getTsFile().getName() +
TsFileResource.RESOURCE_SUFFIX)
- .exists()) {
- TsFileResource resource =
- new TsFileResource(
- fsFactory.getFile(tempPartitionDir,
upgradeResource.getTsFile().getName()));
- resource.deserialize();
- upgradedResources.add(resource);
- }
- }
- UpgradeLog.writeUpgradeLogFile(
- oldTsfilePath + COMMA_SEPERATOR +
UpgradeCheckStatus.AFTER_UPGRADE_FILE);
- } finally {
- upgradeResource.readUnlock();
- }
- return upgradedResources;
- }
-
- @SuppressWarnings("squid:S3776")
- private void clearTmpFolders(List<String> folders) {
- for (String baseDir : folders) {
- File fileFolder = fsFactory.getFile(baseDir);
- if (!fileFolder.isDirectory()) {
- continue;
- }
- for (File storageGroup : Objects.requireNonNull(fileFolder.listFiles()))
{
- if (!storageGroup.isDirectory()) {
- continue;
- }
- File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
- File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
- if (upgradeDir != null) {
- File[] tmpPartitionDirList = upgradeDir.listFiles();
- if (tmpPartitionDirList != null) {
- for (File tmpPartitionDir : tmpPartitionDirList) {
- if (tmpPartitionDir.isDirectory()) {
- try {
- Files.delete(tmpPartitionDir.toPath());
- } catch (IOException e) {
- logger.error("Delete tmpPartitionDir {} failed",
tmpPartitionDir);
- }
- }
- }
- // delete upgrade folder when it is empty
- if (upgradeDir.isDirectory()) {
- try {
- Files.delete(upgradeDir.toPath());
- } catch (IOException e) {
- logger.error("Delete tmpUpgradeDir {} failed", upgradeDir);
- }
- }
- }
- }
- }
- }
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
index 3a15872502c..bb0295a2484 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/TsFileSplitByPartitionTool.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -110,23 +109,6 @@ public class TsFileSplitByPartitionTool implements
AutoCloseable {
}
}
- public TsFileSplitByPartitionTool(TsFileResource resourceToBeRewritten,
boolean needReaderForV2)
- throws IOException {
- oldTsFileResource = resourceToBeRewritten;
- oldTsFile = resourceToBeRewritten.getTsFile();
- String file = oldTsFile.getAbsolutePath();
- if (needReaderForV2) {
- reader = new TsFileSequenceReaderForV2(file);
- } else {
- reader = new TsFileSequenceReader(file);
- }
- partitionWriterMap = new HashMap<>();
- if (FSFactoryProducer.getFSFactory().getFile(file +
ModificationFile.FILE_SUFFIX).exists()) {
- oldModification = (List<Modification>)
resourceToBeRewritten.getModFile().getModifications();
- modsIterator = oldModification.iterator();
- }
- }
-
/**
* Rewrite an old file to the latest version
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
deleted file mode 100644
index d8a97973e0f..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.tools.upgrade;
-
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
-import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class TsFileOnlineUpgradeTool extends TsFileSplitByPartitionTool {
-
- private static final Logger logger =
LoggerFactory.getLogger(TsFileOnlineUpgradeTool.class);
-
- /**
- * Create a file reader of the given file. This reader will read the old
file and rewrite it to a
- * new format(v3) file
- *
- * @param resourceToBeUpgraded the old tsfile resource which need to be
upgrade
- * @throws IOException If some I/O error occurs
- */
- public TsFileOnlineUpgradeTool(TsFileResource resourceToBeUpgraded) throws
IOException {
- super(resourceToBeUpgraded, true);
- }
-
- /**
- * upgrade a single TsFile.
- *
- * @param resourceToBeUpgraded the old file's resource which need to be
upgrade.
- * @param upgradedResources new version tsFiles' resources
- */
- public static void upgradeOneTsFile(
- TsFileResource resourceToBeUpgraded, List<TsFileResource>
upgradedResources)
- throws IOException, WriteProcessException {
- try (TsFileOnlineUpgradeTool updater = new
TsFileOnlineUpgradeTool(resourceToBeUpgraded)) {
- updater.upgradeFile(upgradedResources);
- }
- }
-
- /** upgrade file resource */
- @SuppressWarnings({"squid:S3776", "deprecation"}) // Suppress high Cognitive
Complexity warning
- private void upgradeFile(List<TsFileResource> upgradedResources)
- throws IOException, WriteProcessException {
-
- // check if the old TsFile has correct header
- if (!fileCheck()) {
- return;
- }
-
- int headerLength =
- TSFileConfig.MAGIC_STRING.getBytes().length
- + TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
- reader.position(headerLength);
- byte marker;
- long firstChunkPositionInChunkGroup = headerLength;
- boolean firstChunkInChunkGroup = true;
- String deviceId = null;
- boolean skipReadingChunk = true;
- long chunkHeaderOffset;
- try {
- while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
- switch (marker) {
- case MetaMarker.CHUNK_HEADER:
- chunkHeaderOffset = reader.position() - 1;
- if (skipReadingChunk || deviceId == null) {
- ChunkHeader header = ((TsFileSequenceReaderForV2)
reader).readChunkHeader();
- int dataSize = header.getDataSize();
- while (dataSize > 0) {
- // a new Page
- PageHeader pageHeader =
- ((TsFileSequenceReaderForV2)
reader).readPageHeader(header.getDataType());
- ((TsFileSequenceReaderForV2)
reader).readCompressedPage(pageHeader);
- dataSize -=
- (Integer.BYTES * 2 // the bytes size of uncompressedSize
and compressedSize
- // count, startTime, endTime bytes size in old
statistics
- + 24
- // statistics bytes size
- // new boolean StatsSize is 8 bytes larger than old one
- + (pageHeader.getStatistics().getStatsSize()
- - (header.getDataType() == TSDataType.BOOLEAN ? 8
: 0))
- // page data bytes
- + pageHeader.getCompressedSize());
- }
- } else {
- ChunkHeader header = ((TsFileSequenceReaderForV2)
reader).readChunkHeader();
- MeasurementSchema measurementSchema =
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType());
- TSDataType dataType = header.getDataType();
- TSEncoding encoding = header.getEncodingType();
- List<PageHeader> pageHeadersInChunk = new ArrayList<>();
- List<ByteBuffer> dataInChunk = new ArrayList<>();
- List<Boolean> needToDecodeInfo = new ArrayList<>();
- int dataSize = header.getDataSize();
- while (dataSize > 0) {
- // a new Page
- PageHeader pageHeader =
- ((TsFileSequenceReaderForV2)
reader).readPageHeader(dataType);
- boolean needToDecode =
- checkIfNeedToDecode(
- dataType,
- encoding,
- pageHeader,
- measurementSchema,
- deviceId,
- chunkHeaderOffset);
- needToDecodeInfo.add(needToDecode);
- ByteBuffer pageData =
- !needToDecode
- ? reader.readCompressedPage(pageHeader)
- : reader.readPage(pageHeader,
header.getCompressionType());
- pageHeadersInChunk.add(pageHeader);
- dataInChunk.add(pageData);
- dataSize -=
- (Integer.BYTES * 2 // the bytes size of uncompressedSize
and compressedSize
- // count, startTime, endTime bytes size in old
statistics
- + 24
- // statistics bytes size
- // new boolean StatsSize is 8 bytes larger than old one
- + (pageHeader.getStatistics().getStatsSize()
- - (dataType == TSDataType.BOOLEAN ? 8 : 0))
- // page data bytes
- + pageHeader.getCompressedSize());
- }
- reWriteChunk(
- deviceId,
- firstChunkInChunkGroup,
- measurementSchema,
- pageHeadersInChunk,
- dataInChunk,
- needToDecodeInfo,
- chunkHeaderOffset);
- if (firstChunkInChunkGroup) {
- firstChunkInChunkGroup = false;
- }
- }
- break;
- case MetaMarker.CHUNK_GROUP_HEADER:
- // this is the footer of a ChunkGroup in TsFileV2.
- if (skipReadingChunk) {
- skipReadingChunk = false;
- ChunkGroupHeader chunkGroupFooter =
- ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
- deviceId = chunkGroupFooter.getDeviceID();
- reader.position(firstChunkPositionInChunkGroup);
- } else {
- endChunkGroup();
- skipReadingChunk = true;
- ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
- deviceId = null;
- firstChunkPositionInChunkGroup = reader.position();
- firstChunkInChunkGroup = true;
- }
- break;
- case MetaMarker.VERSION:
- long version = ((TsFileSequenceReaderForV2) reader).readVersion();
- // write plan indices for ending memtable
- for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
- tsFileIOWriter.writePlanIndices();
- }
- firstChunkPositionInChunkGroup = reader.position();
- break;
- default:
- // the disk file is corrupted, using this file may be dangerous
- throw new IOException("Unrecognized marker detected, " + "this
file may be corrupted");
- }
- }
- // close upgraded tsFiles and generate resources for them
- for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
- upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
- }
-
- oldTsFileResource.removeModFile();
-
- } catch (Exception e2) {
- throw new IOException(
- "TsFile upgrade process cannot proceed at position "
- + reader.position()
- + "because: "
- + e2.getMessage());
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
-
- /** TsFileName is changing like from 1610635230693-1-0.tsfile to
1610635230693-1-0-0.tsfile */
- @Override
- public String upgradeTsFileName(String oldTsFileName) {
- String[] name = oldTsFileName.split(TsFileConstant.TSFILE_SUFFIX);
- return name[0] + "-0" + TsFileConstant.TSFILE_SUFFIX;
- }
-
- /**
- * Due to TsFile version-3 changed the serialize way of integer in TEXT data
and INT32 data with
- * PLAIN encoding, and also add a sum statistic for BOOLEAN data, these
types of data need to
- * decode to points and rewrite in new TsFile.
- */
- protected boolean checkIfNeedToDecode(
- TSDataType dataType,
- TSEncoding encoding,
- PageHeader pageHeader,
- MeasurementSchema schema,
- String deviceId,
- long chunkHeaderOffset)
- throws IllegalPathException {
- return dataType == TSDataType.BOOLEAN
- || dataType == TSDataType.TEXT
- || (dataType == TSDataType.INT32 && encoding == TSEncoding.PLAIN)
- || StorageEngine.getTimePartition(pageHeader.getStartTime())
- != StorageEngine.getTimePartition(pageHeader.getEndTime())
- || super.checkIfNeedToDecode(schema, deviceId, pageHeader,
chunkHeaderOffset);
- }
-
- protected void decodeAndWritePage(
- MeasurementSchema schema,
- ByteBuffer pageData,
- Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
- throws IOException {
- valueDecoder.reset();
- PageReaderV2 pageReader =
- new PageReaderV2(pageData, schema.getType(), valueDecoder,
defaultTimeDecoder, null);
- BatchData batchData = pageReader.getAllSatisfiedPageData();
- rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
- }
-
- /** check if the file to be upgraded has correct magic strings and version
number */
- @Override
- protected boolean fileCheck() throws IOException {
- String magic = reader.readHeadMagic();
- if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
- logger.error("the file's MAGIC STRING is incorrect, file path: {}",
reader.getFileName());
- return false;
- }
-
- String versionNumber = ((TsFileSequenceReaderForV2)
reader).readVersionNumberV2();
- if (!versionNumber.equals(TSFileConfig.VERSION_NUMBER_V2)) {
- logger.error("the file's Version Number is incorrect, file path: {}",
reader.getFileName());
- return false;
- }
-
- if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
- logger.error("the file is not closed correctly, file path: {}",
reader.getFileName());
- return false;
- }
- return true;
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index d33eb47c3d9..a287faa4aec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -47,15 +47,6 @@ public class ModificationUtils {
List<? extends IChunkMetadata> chunkMetaData, List<Modification>
modifications) {
for (IChunkMetadata metaData : chunkMetaData) {
for (Modification modification : modifications) {
- // When the chunkMetadata come from an old TsFile, the method
modification.getFileOffset()
- // is gerVersionNum actually. In this case, we compare the versions of
modification and
- // mataData to determine whether need to do modify.
- if (metaData.isFromOldTsFile()) {
- if (modification.getFileOffset() > metaData.getVersion()) {
- doModifyChunkMetaData(modification, metaData);
- }
- continue;
- }
// The case modification.getFileOffset() ==
metaData.getOffsetOfChunkHeader()
// is not supposed to exist as getFileOffset() is offset containing
full chunk,
// while getOffsetOfChunkHeader() returns the chunk header offset
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
deleted file mode 100644
index 429a1a397e3..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeCheckStatus;
-import org.apache.iotdb.db.storageengine.upgrade.UpgradeLog;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class UpgradeUtils {
-
- private static final Logger logger =
LoggerFactory.getLogger(UpgradeUtils.class);
- private static final String COMMA_SEPERATOR = ",";
- private static final ReadWriteLock cntUpgradeFileLock = new
ReentrantReadWriteLock();
- private static final ReadWriteLock upgradeLogLock = new
ReentrantReadWriteLock();
-
- private static FSFactory fsFactory = FSFactoryProducer.getFSFactory();
-
- private static Map<String, Integer> upgradeRecoverMap = new HashMap<>();
-
- public static ReadWriteLock getCntUpgradeFileLock() {
- return cntUpgradeFileLock;
- }
-
- public static ReadWriteLock getUpgradeLogLock() {
- return upgradeLogLock;
- }
-
- /** judge whether a tsfile needs to be upgraded */
- public static boolean isNeedUpgrade(TsFileResource tsFileResource) {
- tsFileResource.readLock();
- // case the TsFile's length is equal to 0, the TsFile does not need to be
upgraded
- try {
- if (tsFileResource.getTsFile().length() == 0) {
- return false;
- }
- } finally {
- tsFileResource.readUnlock();
- }
- tsFileResource.readLock();
- try (TsFileSequenceReaderForV2 tsFileSequenceReader =
- new
TsFileSequenceReaderForV2(tsFileResource.getTsFile().getAbsolutePath())) {
- String versionNumber = tsFileSequenceReader.readVersionNumberV2();
- if (versionNumber.equals(TSFileConfig.VERSION_NUMBER_V2)
- || versionNumber.equals(TSFileConfig.VERSION_NUMBER_V1)) {
- return true;
- }
- } catch (IOException e) {
- logger.error(
- "meet error when judge whether file needs to be upgraded, the file's
path:{}",
- tsFileResource.getTsFile().getAbsolutePath(),
- e);
- } finally {
- tsFileResource.readUnlock();
- }
- return false;
- }
-
- public static void moveUpgradedFiles(TsFileResource resource) throws
IOException {
- List<TsFileResource> upgradedResources = resource.getUpgradedResources();
- for (TsFileResource upgradedResource : upgradedResources) {
- File upgradedFile = upgradedResource.getTsFile();
- long partition = upgradedResource.getTimePartition();
- String virtualStorageGroupDir =
upgradedFile.getParentFile().getParentFile().getParent();
- File partitionDir = fsFactory.getFile(virtualStorageGroupDir,
String.valueOf(partition));
- if (!partitionDir.exists()) {
- partitionDir.mkdir();
- }
- // move upgraded TsFile
- if (upgradedFile.exists()) {
- fsFactory.moveFile(upgradedFile, fsFactory.getFile(partitionDir,
upgradedFile.getName()));
- }
- // get temp resource
- File tempResourceFile =
- fsFactory.getFile(upgradedResource.getTsFile().toPath() +
TsFileResource.RESOURCE_SUFFIX);
- // move upgraded mods file
- File newModsFile =
- fsFactory.getFile(upgradedResource.getTsFile().toPath() +
ModificationFile.FILE_SUFFIX);
- if (newModsFile.exists()) {
- fsFactory.moveFile(newModsFile, fsFactory.getFile(partitionDir,
newModsFile.getName()));
- }
- // re-serialize upgraded resource to correct place
- upgradedResource.setFile(fsFactory.getFile(partitionDir,
upgradedFile.getName()));
- if (fsFactory.getFile(partitionDir, newModsFile.getName()).exists()) {
- upgradedResource.getModFile();
- }
- upgradedResource.setStatus(TsFileResourceStatus.NORMAL);
- upgradedResource.serialize();
- // delete generated temp resource file
- Files.delete(tempResourceFile.toPath());
- }
- }
-
- public static boolean isUpgradedFileGenerated(String oldFileName) {
- return upgradeRecoverMap.containsKey(oldFileName)
- && upgradeRecoverMap.get(oldFileName)
- == UpgradeCheckStatus.AFTER_UPGRADE_FILE.getCheckStatusCode();
- }
-
- public static void clearUpgradeRecoverMap() {
- upgradeRecoverMap = null;
- }
-
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- public static void recoverUpgrade() {
- if
(FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath()).exists())
{
- try (BufferedReader upgradeLogReader =
- new BufferedReader(
- new FileReader(
-
FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath())))) {
- String line = null;
- while ((line = upgradeLogReader.readLine()) != null) {
- String oldFilePath = line.split(COMMA_SEPERATOR)[0];
- String oldFileName = new File(oldFilePath).getName();
- if (upgradeRecoverMap.containsKey(oldFileName)) {
- upgradeRecoverMap.put(oldFileName,
upgradeRecoverMap.get(oldFileName) + 1);
- } else {
- upgradeRecoverMap.put(oldFileName, 1);
- }
- }
- } catch (IOException e) {
- logger.error(
- "meet error when recover upgrade process, file path:{}",
- UpgradeLog.getUpgradeLogPath(),
- e);
- } finally {
-
FSFactoryProducer.getFSFactory().getFile(UpgradeLog.getUpgradeLogPath()).delete();
- }
- }
- }
-}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 4763c6f08e6..00ce5475ada 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -221,7 +221,6 @@ public class IoTDBConstant {
public static final String SEQUENCE_FLODER_NAME = "sequence";
public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
public static final String FILE_NAME_SEPARATOR = "-";
- public static final String UPGRADE_FOLDER_NAME = "upgrade";
public static final String CONSENSUS_FOLDER_NAME = "consensus";
public static final String SNAPSHOT_FOLDER_NAME = "snapshot";
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
index e8b25ffef00..dd0eeba51ec 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
@@ -129,11 +129,6 @@ public class AlignedChunkMetadata implements
IChunkMetadata {
return timeChunkMetadata.getEndTime();
}
- @Override
- public boolean isFromOldTsFile() {
- return false;
- }
-
@Override
public IChunkLoader getChunkLoader() {
return chunkLoader;
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 7e6164912c1..00002972dfa 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -64,8 +64,6 @@ public class ChunkMetadata implements IChunkMetadata {
private Statistics<? extends Serializable> statistics;
- private boolean isFromOldTsFile = false;
-
private long ramSize;
private static final int CHUNK_METADATA_FIXED_RAM_SIZE = 93;
@@ -112,7 +110,6 @@ public class ChunkMetadata implements IChunkMetadata {
this.version = other.version;
this.chunkLoader = other.chunkLoader;
this.statistics = other.statistics;
- this.isFromOldTsFile = other.isFromOldTsFile;
this.ramSize = other.ramSize;
this.isSeq = other.isSeq;
this.isClosed = other.isClosed;
@@ -302,14 +299,6 @@ public class ChunkMetadata implements IChunkMetadata {
this.modified = modified;
}
- public boolean isFromOldTsFile() {
- return isFromOldTsFile;
- }
-
- public void setFromOldTsFile(boolean isFromOldTsFile) {
- this.isFromOldTsFile = isFromOldTsFile;
- }
-
public long calculateRamSize() {
long memSize = CHUNK_METADATA_FIXED_RAM_SIZE;
memSize += RamUsageEstimator.sizeOf(tsFilePrefixPath);
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index e2e654eeb10..51c6c4b55a1 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -51,8 +51,6 @@ public interface IChunkMetadata {
long getEndTime();
- boolean isFromOldTsFile();
-
IChunkLoader getChunkLoader();
boolean needSetChunkLoader();
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 039257e3bdf..8a8d21dcdae 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -35,7 +35,6 @@ public class Chunk {
private ChunkHeader chunkHeader;
private Statistics chunkStatistic;
private ByteBuffer chunkData;
- private boolean isFromOldFile = false;
/** A list of deleted intervals. */
private List<TimeRange> deleteIntervalList;
@@ -149,12 +148,4 @@ public class Chunk {
public Statistics getChunkStatistic() {
return chunkStatistic;
}
-
- public boolean isFromOldFile() {
- return isFromOldFile;
- }
-
- public void setFromOldFile(boolean isFromOldFile) {
- this.isFromOldFile = isFromOldFile;
- }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index 1feaeffc8a4..da73339cd1e 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -36,8 +36,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.v2.file.header.PageHeaderV2;
-import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -75,11 +73,7 @@ public class ChunkReader implements IChunkReader {
this.currentTimestamp = Long.MIN_VALUE;
chunkHeader = chunk.getHeader();
this.unCompressor =
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
- if (chunk.isFromOldFile()) {
- initAllPageReadersV2();
- } else {
- initAllPageReaders(chunk.getChunkStatistic());
- }
+ initAllPageReaders(chunk.getChunkStatistic());
}
/**
@@ -93,11 +87,7 @@ public class ChunkReader implements IChunkReader {
this.currentTimestamp = currentTimestamp;
chunkHeader = chunk.getHeader();
this.unCompressor =
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
- if (chunk.isFromOldFile()) {
- initAllPageReadersV2();
- } else {
- initAllPageReaders(chunk.getChunkStatistic());
- }
+ initAllPageReaders(chunk.getChunkStatistic());
}
/**
@@ -283,48 +273,4 @@ public class ChunkReader implements IChunkReader {
public List<IPageReader> loadPageReaderList() {
return pageReaderList;
}
-
- // For reading TsFile V2
- private void initAllPageReadersV2() throws IOException {
- // construct next satisfied page header
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader =
- PageHeaderV2.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- // if the current page satisfies
- if (pageSatisfied(pageHeader)) {
- pageReaderList.add(constructPageReaderForNextPageV2(pageHeader));
- } else {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- }
- }
- }
-
- // For reading TsFile V2
- private PageReader constructPageReaderForNextPageV2(PageHeader pageHeader)
throws IOException {
- int compressedPageBodyLength = pageHeader.getCompressedSize();
- byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
- // doesn't has a complete page body
- if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
- throw new IOException(
- "do not has a complete page body. Expected:"
- + compressedPageBodyLength
- + ". Actual:"
- + chunkDataBuffer.remaining());
- }
-
- chunkDataBuffer.get(compressedPageBody);
- Decoder valueDecoder =
- Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
- byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()];
- unCompressor.uncompress(
- compressedPageBody, 0, compressedPageBodyLength, uncompressedPageData,
0);
- ByteBuffer pageData = ByteBuffer.wrap(uncompressedPageData);
- PageReader reader =
- new PageReaderV2(
- pageHeader, pageData, chunkHeader.getDataType(), valueDecoder,
timeDecoder, filter);
- reader.setDeleteIntervalList(deleteIntervalList);
- return reader;
- }
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java
deleted file mode 100644
index 05dd5bc09a1..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/footer/ChunkGroupFooterV2.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.tsfile.v2.file.footer;
-
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public class ChunkGroupFooterV2 {
-
- private ChunkGroupFooterV2() {}
-
- /**
- * deserialize from inputStream.
- *
- * @param markerRead Whether the marker of the CHUNK_GROUP_FOOTER is read
ahead.
- */
- public static ChunkGroupHeader deserializeFrom(InputStream inputStream,
boolean markerRead)
- throws IOException {
- if (!markerRead) {
- byte marker = (byte) inputStream.read();
- if (marker != MetaMarker.CHUNK_GROUP_HEADER) {
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
-
- String deviceID = ReadWriteIOUtils.readString(inputStream);
- // dataSize
- ReadWriteIOUtils.readLong(inputStream);
- // numOfChunks
- ReadWriteIOUtils.readInt(inputStream);
- return new ChunkGroupHeader(deviceID);
- }
-
- /**
- * deserialize from TsFileInput.
- *
- * @param markerRead Whether the marker of the CHUNK_GROUP_FOOTER is read
ahead.
- */
- public static ChunkGroupHeader deserializeFrom(TsFileInput input, long
offset, boolean markerRead)
- throws IOException {
- long offsetVar = offset;
- if (!markerRead) {
- offsetVar++;
- }
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
- input.read(buffer, offsetVar);
- buffer.flip();
- int size = buffer.getInt();
- offsetVar += Integer.BYTES;
- buffer = ByteBuffer.allocate(getSerializedSize(size));
- ReadWriteIOUtils.readAsPossible(input, offsetVar, buffer);
- buffer.flip();
- String deviceID = ReadWriteIOUtils.readStringWithLength(buffer, size);
- // dataSize
- ReadWriteIOUtils.readLong(buffer);
- // numOfChunks
- ReadWriteIOUtils.readInt(buffer);
- return new ChunkGroupHeader(deviceID);
- }
-
- private static int getSerializedSize(int deviceIdLength) {
- return deviceIdLength + Long.BYTES + Integer.BYTES;
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java
deleted file mode 100644
index 82cad8cf6f7..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/ChunkHeaderV2.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.header;
-
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public class ChunkHeaderV2 {
-
- private ChunkHeaderV2() {}
-
- /**
- * deserialize from inputStream.
- *
- * @param markerRead Whether the marker of the CHUNK_HEADER has been read
- */
- public static ChunkHeader deserializeFrom(InputStream inputStream, boolean
markerRead)
- throws IOException {
- if (!markerRead) {
- byte marker = (byte) inputStream.read();
- if (marker != MetaMarker.CHUNK_HEADER) {
- MetaMarker.handleUnexpectedMarker(marker);
- }
- }
-
- String measurementID = ReadWriteIOUtils.readString(inputStream);
- int dataSize = ReadWriteIOUtils.readInt(inputStream);
- TSDataType dataType = TSDataType.deserialize((byte)
ReadWriteIOUtils.readShort(inputStream));
- int numOfPages = ReadWriteIOUtils.readInt(inputStream);
- CompressionType type =
- CompressionType.deserialize((byte)
ReadWriteIOUtils.readShort(inputStream));
- TSEncoding encoding = TSEncoding.deserialize((byte)
ReadWriteIOUtils.readShort(inputStream));
- return new ChunkHeader(measurementID, dataSize, dataType, type, encoding,
numOfPages);
- }
-
- /**
- * deserialize from TsFileInput.
- *
- * @param input TsFileInput
- * @param offset offset
- * @param chunkHeaderSize the size of chunk's header
- * @param markerRead read marker (boolean type)
- * @return CHUNK_HEADER object
- * @throws IOException IOException
- */
- public static ChunkHeader deserializeFrom(
- TsFileInput input, long offset, int chunkHeaderSize, boolean markerRead)
throws IOException {
- long offsetVar = offset;
- if (!markerRead) {
- offsetVar++;
- }
-
- // read chunk header from input to buffer
- ByteBuffer buffer = ByteBuffer.allocate(chunkHeaderSize);
- input.read(buffer, offsetVar);
- buffer.flip();
-
- // read measurementID
- int size = buffer.getInt();
- String measurementID = ReadWriteIOUtils.readStringWithLength(buffer, size);
- int dataSize = ReadWriteIOUtils.readInt(buffer);
- TSDataType dataType = TSDataType.deserialize((byte)
ReadWriteIOUtils.readShort(buffer));
- // numOfPages
- ReadWriteIOUtils.readInt(buffer);
- CompressionType type = CompressionType.deserialize((byte)
ReadWriteIOUtils.readShort(buffer));
- TSEncoding encoding = TSEncoding.deserialize((byte)
ReadWriteIOUtils.readShort(buffer));
- return new ChunkHeader(
- MetaMarker.CHUNK_HEADER,
- measurementID,
- dataSize,
- chunkHeaderSize,
- dataType,
- type,
- encoding);
- }
-
- public static int getSerializedSize(String measurementID) {
- return Byte.BYTES // marker
- + Integer.BYTES // measurementID length
- + measurementID.getBytes(TSFileConfig.STRING_CHARSET).length //
measurementID
- + Integer.BYTES // dataSize
- + Short.BYTES // dataType
- + Short.BYTES // compressionType
- + Short.BYTES // encodingType
- + Integer.BYTES; // numOfPages
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java
deleted file mode 100644
index 6de0aba3501..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/header/PageHeaderV2.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.header;
-
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-public class PageHeaderV2 {
-
- private PageHeaderV2() {}
-
- public static PageHeader deserializeFrom(InputStream inputStream, TSDataType
dataType)
- throws IOException {
- int uncompressedSize = ReadWriteIOUtils.readInt(inputStream);
- int compressedSize = ReadWriteIOUtils.readInt(inputStream);
- Statistics<? extends Serializable> statistics =
StatisticsV2.deserialize(inputStream, dataType);
- return new PageHeader(uncompressedSize, compressedSize, statistics);
- }
-
- public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType
dataType) {
- int uncompressedSize = ReadWriteIOUtils.readInt(buffer);
- int compressedSize = ReadWriteIOUtils.readInt(buffer);
- Statistics<? extends Serializable> statistics =
StatisticsV2.deserialize(buffer, dataType);
- return new PageHeader(uncompressedSize, compressedSize, statistics);
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java
deleted file mode 100644
index 2b329d45565..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/ChunkMetadataV2.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata;
-
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-public class ChunkMetadataV2 {
-
- private ChunkMetadataV2() {}
-
- /**
- * deserialize from ByteBuffer.
- *
- * @param buffer ByteBuffer
- * @return ChunkMetaData object
- */
- public static ChunkMetadata deserializeFrom(ByteBuffer buffer) {
-
- String measurementUid = ReadWriteIOUtils.readString(buffer);
- long offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
- TSDataType tsDataType = TSDataType.deserialize((byte)
ReadWriteIOUtils.readShort(buffer));
-
- Statistics<? extends Serializable> statistics =
StatisticsV2.deserialize(buffer, tsDataType);
- ChunkMetadata chunkMetaData =
- new ChunkMetadata(measurementUid, tsDataType, offsetOfChunkHeader,
statistics);
- chunkMetaData.setFromOldTsFile(true);
- return chunkMetaData;
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
deleted file mode 100644
index a179c1deed1..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexEntryV2.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata;
-
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-
-public class MetadataIndexEntryV2 {
-
- private MetadataIndexEntryV2() {}
-
- public static MetadataIndexEntry deserializeFrom(ByteBuffer buffer) {
- String name = ReadWriteIOUtils.readString(buffer);
- long offset = ReadWriteIOUtils.readLong(buffer);
- return new MetadataIndexEntry(name, offset);
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
deleted file mode 100644
index a2493852083..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/MetadataIndexNodeV2.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata;
-
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MetadataIndexNodeV2 {
-
- private MetadataIndexNodeV2() {}
-
- public static MetadataIndexNode deserializeFrom(ByteBuffer buffer) {
- int size = ReadWriteIOUtils.readInt(buffer);
- List<MetadataIndexEntry> children = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- children.add(MetadataIndexEntryV2.deserializeFrom(buffer));
- }
- long offset = ReadWriteIOUtils.readLong(buffer);
- MetadataIndexNodeType nodeType =
- MetadataIndexNodeType.deserialize(ReadWriteIOUtils.readByte(buffer));
- return new MetadataIndexNode(children, offset, nodeType);
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java
deleted file mode 100644
index accb4833500..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TimeseriesMetadataV2.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata;
-
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.v2.file.metadata.statistics.StatisticsV2;
-
-import java.nio.ByteBuffer;
-
-public class TimeseriesMetadataV2 {
-
- private TimeseriesMetadataV2() {}
-
- public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer) {
- TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata();
- timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readString(buffer));
- timeseriesMetaData.setTsDataType(
- TSDataType.deserialize((byte) ReadWriteIOUtils.readShort(buffer)));
-
timeseriesMetaData.setOffsetOfChunkMetaDataList(ReadWriteIOUtils.readLong(buffer));
-
timeseriesMetaData.setDataSizeOfChunkMetaDataList(ReadWriteIOUtils.readInt(buffer));
- timeseriesMetaData.setStatistics(
- StatisticsV2.deserialize(buffer, timeseriesMetaData.getTsDataType()));
- return timeseriesMetaData;
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java
deleted file mode 100644
index 2ebd12e7a70..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/TsFileMetadataV2.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata;
-
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
-import org.apache.iotdb.tsfile.utils.BloomFilter;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-public class TsFileMetadataV2 {
-
- private TsFileMetadataV2() {}
-
- /**
- * deserialize data from the buffer.
- *
- * @param buffer -buffer use to deserialize
- * @return -a pair of TsFileMetaData and VersionInfo
- */
- public static Pair<TsFileMetadata, List<Pair<Long, Long>>>
deserializeFrom(ByteBuffer buffer) {
- TsFileMetadata fileMetaData = new TsFileMetadata();
-
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
- // metadataIndex
- fileMetaData.setMetadataIndex(MetadataIndexNodeV2.deserializeFrom(buffer));
- // totalChunkNum
- ReadWriteIOUtils.readInt(buffer);
- // invalidChunkNum
- ReadWriteIOUtils.readInt(buffer);
-
- // versionInfo
- int versionSize = ReadWriteIOUtils.readInt(buffer);
- for (int i = 0; i < versionSize; i++) {
- long versionPos = ReadWriteIOUtils.readLong(buffer);
- long version = ReadWriteIOUtils.readLong(buffer);
- versionInfo.add(new Pair<>(versionPos, version));
- }
-
- // metaOffset
- long metaOffset = ReadWriteIOUtils.readLong(buffer);
- fileMetaData.setMetaOffset(metaOffset);
-
- // read bloom filter
- if (buffer.hasRemaining()) {
- int byteLength = ReadWriteIOUtils.readInt(buffer);
- byte[] bytes = new byte[byteLength];
- buffer.get(bytes);
- int filterSize = ReadWriteIOUtils.readInt(buffer);
- int hashFunctionSize = ReadWriteIOUtils.readInt(buffer);
- fileMetaData.setBloomFilter(
- BloomFilter.buildBloomFilter(bytes, filterSize, hashFunctionSize));
- }
-
- return new Pair<>(fileMetaData, versionInfo);
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java
deleted file mode 100644
index 24b06f7fd5a..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/file/metadata/statistics/StatisticsV2.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.file.metadata.statistics;
-
-import org.apache.iotdb.tsfile.exception.write.UnknownColumnTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics;
-import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-
-public class StatisticsV2 {
-
- private StatisticsV2() {}
-
- public static Statistics<? extends Serializable> deserialize(
- InputStream inputStream, TSDataType dataType) throws IOException {
- Statistics<? extends Serializable> statistics =
Statistics.getStatsByType(dataType);
- statistics.setCount((int) ReadWriteIOUtils.readLong(inputStream));
- statistics.setStartTime(ReadWriteIOUtils.readLong(inputStream));
- statistics.setEndTime(ReadWriteIOUtils.readLong(inputStream));
- switch (dataType) {
- case BOOLEAN:
- boolean firstBool = ReadWriteIOUtils.readBool(inputStream);
- boolean lastBool = ReadWriteIOUtils.readBool(inputStream);
- ((BooleanStatistics) statistics).initializeStats(firstBool, lastBool,
0);
- break;
- case INT32:
- int minValue = ReadWriteIOUtils.readInt(inputStream);
- int maxValue = ReadWriteIOUtils.readInt(inputStream);
- int firstValue = ReadWriteIOUtils.readInt(inputStream);
- int lastValue = ReadWriteIOUtils.readInt(inputStream);
- long sumValue = (long) ReadWriteIOUtils.readDouble(inputStream);
- ((IntegerStatistics) statistics)
- .initializeStats(minValue, maxValue, firstValue, lastValue,
sumValue);
- break;
- case INT64:
- case TEXT:
- case DOUBLE:
- case FLOAT:
- statistics.deserialize(inputStream);
- break;
- default:
- throw new UnknownColumnTypeException(dataType.toString());
- }
- statistics.setEmpty(false);
- return statistics;
- }
-
- public static Statistics<? extends Serializable> deserialize(
- ByteBuffer buffer, TSDataType dataType) {
- Statistics<? extends Serializable> statistics =
Statistics.getStatsByType(dataType);
- statistics.setCount((int) ReadWriteIOUtils.readLong(buffer));
- statistics.setStartTime(ReadWriteIOUtils.readLong(buffer));
- statistics.setEndTime(ReadWriteIOUtils.readLong(buffer));
- switch (dataType) {
- case BOOLEAN:
- boolean firstBool = ReadWriteIOUtils.readBool(buffer);
- boolean lastBool = ReadWriteIOUtils.readBool(buffer);
- ((BooleanStatistics) statistics).initializeStats(firstBool, lastBool,
0);
- break;
- case INT32:
- int minValue = ReadWriteIOUtils.readInt(buffer);
- int maxValue = ReadWriteIOUtils.readInt(buffer);
- int firstValue = ReadWriteIOUtils.readInt(buffer);
- int lastValue = ReadWriteIOUtils.readInt(buffer);
- long sumValue = (long) ReadWriteIOUtils.readDouble(buffer);
- ((IntegerStatistics) statistics)
- .initializeStats(minValue, maxValue, firstValue, lastValue,
sumValue);
- break;
- case INT64:
- case TEXT:
- case DOUBLE:
- case FLOAT:
- statistics.deserialize(buffer);
- break;
- default:
- throw new UnknownColumnTypeException(dataType.toString());
- }
- statistics.setEmpty(false);
- return statistics;
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
deleted file mode 100644
index 00536ca1dc7..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.read;
-
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
-import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.v2.file.footer.ChunkGroupFooterV2;
-import org.apache.iotdb.tsfile.v2.file.header.ChunkHeaderV2;
-import org.apache.iotdb.tsfile.v2.file.header.PageHeaderV2;
-import org.apache.iotdb.tsfile.v2.file.metadata.ChunkMetadataV2;
-import org.apache.iotdb.tsfile.v2.file.metadata.MetadataIndexNodeV2;
-import org.apache.iotdb.tsfile.v2.file.metadata.TimeseriesMetadataV2;
-import org.apache.iotdb.tsfile.v2.file.metadata.TsFileMetadataV2;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-
-public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements
AutoCloseable {
-
- private List<Pair<Long, Long>> versionInfo;
-
- /**
- * Create a file reader of the given file. The reader will read the tail of
the file to get the
- * file metadata size.Then the reader will skip the first
- * TSFileConfig.MAGIC_STRING.getBytes().length +
TSFileConfig.NUMBER_VERSION.getBytes().length
- * bytes of the file for preparing reading real data.
- *
- * @param file the data file
- * @throws IOException If some I/O error occurs
- */
- public TsFileSequenceReaderForV2(String file) throws IOException {
- this(file, true);
- }
-
- /**
- * construct function for TsFileSequenceReader.
- *
- * @param file -given file name
- * @param loadMetadataSize -whether load meta data size
- */
- public TsFileSequenceReaderForV2(String file, boolean loadMetadataSize)
throws IOException {
- super(file, loadMetadataSize);
- }
-
- /**
- * Create a file reader of the given file. The reader will read the tail of
the file to get the
- * file metadata size.Then the reader will skip the first
- * TSFileConfig.MAGIC_STRING.getBytes().length +
TSFileConfig.NUMBER_VERSION.getBytes().length
- * bytes of the file for preparing reading real data.
- *
- * @param input given input
- */
- public TsFileSequenceReaderForV2(TsFileInput input) throws IOException {
- this(input, true);
- }
-
- /**
- * construct function for TsFileSequenceReader.
- *
- * @param input -given input
- * @param loadMetadataSize -load meta data size
- */
- public TsFileSequenceReaderForV2(TsFileInput input, boolean
loadMetadataSize) throws IOException {
- super(input, loadMetadataSize);
- }
-
- /**
- * construct function for TsFileSequenceReader.
- *
- * @param input the input of a tsfile. The current position should be a
markder and then a chunk
- * Header, rather than the magic number
- * @param fileMetadataPos the position of the file metadata in the
TsFileInput from the beginning
- * of the input to the current position
- * @param fileMetadataSize the byte size of the file metadata in the input
- */
- public TsFileSequenceReaderForV2(TsFileInput input, long fileMetadataPos,
int fileMetadataSize) {
- super(input, fileMetadataPos, fileMetadataSize);
- this.fileMetadataPos = fileMetadataPos;
- this.fileMetadataSize = fileMetadataSize;
- }
-
- /** whether the file is a complete TsFile: only if the head magic and tail
magic string exists. */
- @Override
- public boolean isComplete() throws IOException {
- return tsFileInput.size()
- >= TSFileConfig.MAGIC_STRING.getBytes().length * 2
- + TSFileConfig.VERSION_NUMBER_V2.getBytes().length
- && (readTailMagic().equals(readHeadMagic()));
- }
-
- /** this function reads version number and checks compatibility of TsFile. */
- public String readVersionNumberV2() throws IOException {
- ByteBuffer versionNumberBytes =
- ByteBuffer.allocate(TSFileConfig.VERSION_NUMBER_V2.getBytes().length);
- tsFileInput.read(versionNumberBytes,
TSFileConfig.MAGIC_STRING.getBytes().length);
- versionNumberBytes.flip();
- return new String(versionNumberBytes.array());
- }
-
- /**
- * this function does not modify the position of the file reader.
- *
- * @throws IOException io error
- */
- @Override
- public TsFileMetadata readFileMetadata() throws IOException {
- if (tsFileMetaData == null || versionInfo == null) {
- Pair<TsFileMetadata, List<Pair<Long, Long>>> pair =
- TsFileMetadataV2.deserializeFrom(readData(fileMetadataPos,
fileMetadataSize));
- tsFileMetaData = pair.left;
- versionInfo = pair.right;
- }
- return tsFileMetaData;
- }
-
- @Override
- public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean
ignoreNotExists)
- throws IOException {
- readFileMetadata();
- MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetV2(
- deviceMetadataIndexNode, path.getDevice(),
MetadataIndexNodeType.INTERNAL_DEVICE, true);
- if (metadataIndexPair == null) {
- if (ignoreNotExists) {
- return null;
- }
- return null;
- }
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
- if
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
- metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
- metadataIndexPair =
- getMetadataAndEndOffsetV2(
- metadataIndexNode,
- path.getMeasurement(),
- MetadataIndexNodeType.INTERNAL_MEASUREMENT,
- false);
- }
- if (metadataIndexPair == null) {
- return null;
- }
- List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
- buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeseriesMetadata =
TimeseriesMetadataV2.deserializeFrom(buffer);
- ArrayList<ChunkMetadata> chunkMetadataList =
readChunkMetaDataList(timeseriesMetadata);
- timeseriesMetadata.setChunkMetadataList(chunkMetadataList);
- timeseriesMetadataList.add(timeseriesMetadata);
- }
- // return null if path does not exist in the TsFile
- int searchResult =
- binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
path.getMeasurement());
- return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
- }
-
- /*Find the leaf node that contains path, return all the sensors in that leaf
node which are also
- in allSensors set */
- @SuppressWarnings("squid:S3776")
- @Override
- public List<TimeseriesMetadata> readTimeseriesMetadata(Path path,
Set<String> allSensors)
- throws IOException {
- readFileMetadata();
- MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(),
true, true);
- if (metadataIndexPair == null) {
- return Collections.emptyList();
- }
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
- if
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
- metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
- metadataIndexPair =
- getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(),
false, false);
- }
- if (metadataIndexPair == null) {
- return Collections.emptyList();
- }
- List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
- buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeseriesMetadata;
- timeseriesMetadata = TimeseriesMetadataV2.deserializeFrom(buffer);
- ArrayList<ChunkMetadata> chunkMetadataList =
readChunkMetaDataList(timeseriesMetadata);
- timeseriesMetadata.setChunkMetadataList(chunkMetadataList);
- if (allSensors.contains(timeseriesMetadata.getMeasurementId())) {
- timeseriesMetadataList.add(timeseriesMetadata);
- }
- }
- return timeseriesMetadataList;
- }
-
- @SuppressWarnings("squid:S3776")
- @Override
- public List<ITimeSeriesMetadata> readITimeseriesMetadata(String device,
Set<String> measurements)
- throws IOException {
- readFileMetadata();
- MetadataIndexNode deviceMetadataIndexNode =
tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetV2(
- deviceMetadataIndexNode, device,
MetadataIndexNodeType.INTERNAL_DEVICE, false);
- if (metadataIndexPair == null) {
- return Collections.emptyList();
- }
- List<ITimeSeriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
- List<String> measurementList = new ArrayList<>(measurements);
- Set<String> measurementsHadFound = new HashSet<>();
- for (int i = 0; i < measurementList.size(); i++) {
- if (measurementsHadFound.contains(measurementList.get(i))) {
- continue;
- }
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair =
metadataIndexPair;
- List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
- MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
- if
(!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT))
{
- metadataIndexNode = MetadataIndexNodeV2.deserializeFrom(buffer);
- measurementMetadataIndexPair =
- getMetadataAndEndOffsetV2(
- metadataIndexNode,
- measurementList.get(i),
- MetadataIndexNodeType.INTERNAL_MEASUREMENT,
- false);
- }
- if (measurementMetadataIndexPair == null) {
- return Collections.emptyList();
- }
- buffer =
- readData(
- measurementMetadataIndexPair.left.getOffset(),
measurementMetadataIndexPair.right);
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeseriesMetadata =
TimeseriesMetadataV2.deserializeFrom(buffer);
- ArrayList<ChunkMetadata> chunkMetadataList =
readChunkMetaDataList(timeseriesMetadata);
- timeseriesMetadata.setChunkMetadataList(chunkMetadataList);
- timeseriesMetadataList.add(timeseriesMetadata);
- }
- for (int j = i; j < measurementList.size(); j++) {
- String current = measurementList.get(j);
- if (!measurementsHadFound.contains(current)) {
- int searchResult =
binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current);
- if (searchResult >= 0) {
-
resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
- measurementsHadFound.add(current);
- }
- }
- if (measurementsHadFound.size() == measurements.size()) {
- return resultTimeseriesMetadataList;
- }
- }
- }
- return resultTimeseriesMetadataList;
- }
-
- @Override
- public List<String> getAllDevices() throws IOException {
- if (tsFileMetaData == null) {
- readFileMetadata();
- }
- return getAllDevicesV2(tsFileMetaData.getMetadataIndex());
- }
-
- private List<String> getAllDevicesV2(MetadataIndexNode metadataIndexNode)
throws IOException {
- List<String> deviceList = new ArrayList<>();
- int metadataIndexListSize = metadataIndexNode.getChildren().size();
- if
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT))
{
- for (MetadataIndexEntry index : metadataIndexNode.getChildren()) {
- deviceList.add(index.getName());
- }
- } else {
- for (int i = 0; i < metadataIndexListSize; i++) {
- long endOffset = metadataIndexNode.getEndOffset();
- if (i != metadataIndexListSize - 1) {
- endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
- }
- ByteBuffer buffer =
readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
- MetadataIndexNode node = MetadataIndexNodeV2.deserializeFrom(buffer);
- if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
- // if node in next level is LEAF_DEVICE, put all devices in node
entry into the set
- deviceList.addAll(
- node.getChildren().stream()
- .map(MetadataIndexEntry::getName)
- .collect(Collectors.toList()));
- } else {
- // keep traversing
- deviceList.addAll(getAllDevicesV2(node));
- }
- }
- }
- return deviceList;
- }
-
- /**
- * read all ChunkMetaDatas of given device
- *
- * @param device name
- * @return measurement -> ChunkMetadata list
- * @throws IOException io error
- */
- @Override
- public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String
device)
- throws IOException {
- if (tsFileMetaData == null) {
- readFileMetadata();
- }
-
- long start = 0;
- int size = 0;
- List<TimeseriesMetadata> timeseriesMetadataMap =
getDeviceTimeseriesMetadataV2(device);
- for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
- if (start == 0) {
- start = timeseriesMetadata.getOffsetOfChunkMetaDataList();
- }
- size += timeseriesMetadata.getDataSizeOfChunkMetaDataList();
- }
- // read buffer of all ChunkMetadatas of this device
- ByteBuffer buffer = readData(start, size);
- Map<String, List<ChunkMetadata>> seriesMetadata = new HashMap<>();
- while (buffer.hasRemaining()) {
- ChunkMetadata chunkMetadata = ChunkMetadataV2.deserializeFrom(buffer);
- seriesMetadata
- .computeIfAbsent(chunkMetadata.getMeasurementUid(), key -> new
ArrayList<>())
- .add(chunkMetadata);
- }
- // set version in ChunkMetadata
- for (Entry<String, List<ChunkMetadata>> entry : seriesMetadata.entrySet())
{
- applyVersion(entry.getValue());
- }
- return seriesMetadata;
- }
-
- /**
- * Traverse the metadata index from MetadataIndexEntry to get
TimeseriesMetadatas
- *
- * @param metadataIndex MetadataIndexEntry
- * @param buffer byte buffer
- * @param deviceId String
- * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
- */
- private void generateMetadataIndexV2(
- MetadataIndexEntry metadataIndex,
- ByteBuffer buffer,
- String deviceId,
- MetadataIndexNodeType type,
- Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap)
- throws IOException {
- switch (type) {
- case INTERNAL_DEVICE:
- case LEAF_DEVICE:
- case INTERNAL_MEASUREMENT:
- deviceId = metadataIndex.getName();
- MetadataIndexNode metadataIndexNode =
MetadataIndexNodeV2.deserializeFrom(buffer);
- int metadataIndexListSize = metadataIndexNode.getChildren().size();
- for (int i = 0; i < metadataIndexListSize; i++) {
- long endOffset = metadataIndexNode.getEndOffset();
- if (i != metadataIndexListSize - 1) {
- endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
- }
- ByteBuffer nextBuffer =
- readData(metadataIndexNode.getChildren().get(i).getOffset(),
endOffset);
- generateMetadataIndexV2(
- metadataIndexNode.getChildren().get(i),
- nextBuffer,
- deviceId,
- metadataIndexNode.getNodeType(),
- timeseriesMetadataMap);
- }
- break;
- case LEAF_MEASUREMENT:
- List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
- while (buffer.hasRemaining()) {
- TimeseriesMetadata timeseriesMetadata =
TimeseriesMetadataV2.deserializeFrom(buffer);
- ArrayList<ChunkMetadata> chunkMetadataList =
readChunkMetaDataList(timeseriesMetadata);
- timeseriesMetadata.setChunkMetadataList(chunkMetadataList);
- timeseriesMetadataList.add(timeseriesMetadata);
- }
- timeseriesMetadataMap
- .computeIfAbsent(deviceId, k -> new ArrayList<>())
- .addAll(timeseriesMetadataList);
- break;
- }
- }
-
- @Override
- public Map<String, List<TimeseriesMetadata>>
getAllTimeseriesMetadata(boolean needChunkMetadata)
- throws IOException {
- if (tsFileMetaData == null) {
- readFileMetadata();
- }
- Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new
HashMap<>();
- MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
- List<MetadataIndexEntry> metadataIndexEntryList =
metadataIndexNode.getChildren();
- for (int i = 0; i < metadataIndexEntryList.size(); i++) {
- MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
- long endOffset = tsFileMetaData.getMetadataIndex().getEndOffset();
- if (i != metadataIndexEntryList.size() - 1) {
- endOffset = metadataIndexEntryList.get(i + 1).getOffset();
- }
- ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
- generateMetadataIndexV2(
- metadataIndexEntry, buffer, null, metadataIndexNode.getNodeType(),
timeseriesMetadataMap);
- }
- return timeseriesMetadataMap;
- }
-
- private List<TimeseriesMetadata> getDeviceTimeseriesMetadataV2(String
device) throws IOException {
- MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
- Pair<MetadataIndexEntry, Long> metadataIndexPair =
- getMetadataAndEndOffsetV2(
- metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE,
true);
- if (metadataIndexPair == null) {
- return Collections.emptyList();
- }
- ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(),
metadataIndexPair.right);
- Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new
TreeMap<>();
- generateMetadataIndexV2(
- metadataIndexPair.left,
- buffer,
- device,
- MetadataIndexNodeType.INTERNAL_MEASUREMENT,
- timeseriesMetadataMap);
- List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
- for (List<TimeseriesMetadata> timeseriesMetadataList :
timeseriesMetadataMap.values()) {
- deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
- }
- return deviceTimeseriesMetadata;
- }
-
- /**
- * Get target MetadataIndexEntry and its end offset
- *
- * @param metadataIndex given MetadataIndexNode
- * @param name target device / measurement name
- * @param type target MetadataIndexNodeType, either INTERNAL_DEVICE or
INTERNAL_MEASUREMENT. When
- * searching for a device node, return when it is not INTERNAL_DEVICE.
Likewise, when
- * searching for a measurement node, return when it is not
INTERNAL_MEASUREMENT. This works
- * for the situation when the index tree does NOT have the device level
and ONLY has the
- * measurement level.
- * @param exactSearch if is in exact search mode, return null when there is
no entry with name; or
- * else return the nearest MetadataIndexEntry before it (for deeper
search)
- * @return target MetadataIndexEntry, endOffset pair
- */
- private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffsetV2(
- MetadataIndexNode metadataIndex, String name, MetadataIndexNodeType
type, boolean exactSearch)
- throws IOException {
- if (!metadataIndex.getNodeType().equals(type)) {
- return metadataIndex.getChildIndexEntry(name, exactSearch);
- } else {
- Pair<MetadataIndexEntry, Long> childIndexEntry =
- metadataIndex.getChildIndexEntry(name, false);
- ByteBuffer buffer = readData(childIndexEntry.left.getOffset(),
childIndexEntry.right);
- return getMetadataAndEndOffsetV2(
- MetadataIndexNodeV2.deserializeFrom(buffer), name, type, false);
- }
- }
-
- /**
- * read data from current position of the input, and deserialize it to a
CHUNK_GROUP_FOOTER. <br>
- * This method is not threadsafe.
- *
- * @return a CHUNK_GROUP_FOOTER
- * @throws IOException io error
- */
- public ChunkGroupHeader readChunkGroupFooter() throws IOException {
- return ChunkGroupFooterV2.deserializeFrom(tsFileInput.wrapAsInputStream(),
true);
- }
-
- /**
- * read data from current position of the input, and deserialize it to a
CHUNK_HEADER. <br>
- * This method is not threadsafe.
- *
- * @return a CHUNK_HEADER
- * @throws IOException io error
- */
- public ChunkHeader readChunkHeader() throws IOException {
- return ChunkHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(),
true);
- }
-
- /**
- * read the chunk's header.
- *
- * @param position the file offset of this chunk's header
- * @param chunkHeaderSize the size of chunk's header
- * @param markerRead true if the offset does not contains the marker ,
otherwise false
- */
- private ChunkHeader readChunkHeader(long position, int chunkHeaderSize,
boolean markerRead)
- throws IOException {
- return ChunkHeaderV2.deserializeFrom(tsFileInput, position,
chunkHeaderSize, markerRead);
- }
-
- /**
- * notice, this function will modify channel's position.
- *
- * @param dataSize the size of chunkdata
- * @param position the offset of the chunk data
- * @return the pages of this chunk
- */
- private ByteBuffer readChunkV2(long position, int dataSize) throws
IOException {
- return readData(position, dataSize);
- }
-
- /**
- * read memory chunk.
- *
- * @param metaData -given chunk meta data
- * @return -chunk
- */
- @Override
- public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
- int chunkHeadSize =
ChunkHeaderV2.getSerializedSize(metaData.getMeasurementUid());
- ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(),
chunkHeadSize, false);
- ByteBuffer buffer =
- readChunkV2(
- metaData.getOffsetOfChunkHeader() + header.getSerializedSize(),
header.getDataSize());
- Chunk chunk =
- new Chunk(header, buffer, metaData.getDeleteIntervalList(),
metaData.getStatistics());
- chunk.setFromOldFile(true);
- return chunk;
- }
-
- /**
- * not thread safe.
- *
- * @param type given tsfile data type
- */
- public PageHeader readPageHeader(TSDataType type) throws IOException {
- return PageHeaderV2.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
- }
-
- public long readVersion() throws IOException {
- ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
- if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
- throw new IOException("reach the end of the file.");
- }
- buffer.flip();
- return buffer.getLong();
- }
-
- /**
- * get ChunkMetaDatas in given TimeseriesMetaData
- *
- * @return List of ChunkMetaData
- */
- public ArrayList<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata
timeseriesMetaData)
- throws IOException {
- readFileMetadata();
- ArrayList<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- long startOffsetOfChunkMetadataList =
timeseriesMetaData.getOffsetOfChunkMetaDataList();
- int dataSizeOfChunkMetadataList =
timeseriesMetaData.getDataSizeOfChunkMetaDataList();
-
- ByteBuffer buffer = readData(startOffsetOfChunkMetadataList,
dataSizeOfChunkMetadataList);
- while (buffer.hasRemaining()) {
- chunkMetadataList.add(ChunkMetadataV2.deserializeFrom(buffer));
- }
-
- // minimize the storage of an ArrayList instance.
- chunkMetadataList.trimToSize();
- applyVersion(chunkMetadataList);
- return chunkMetadataList;
- }
-
- private void applyVersion(List<ChunkMetadata> chunkMetadataList) {
- if (versionInfo == null || versionInfo.isEmpty()) {
- return;
- }
- int versionIndex = 0;
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-
- while (chunkMetadata.getOffsetOfChunkHeader() >=
versionInfo.get(versionIndex).left) {
- versionIndex++;
- }
-
- chunkMetadata.setVersion(versionInfo.get(versionIndex).right);
- }
- }
-}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
deleted file mode 100644
index e6eba157834..00000000000
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/reader/page/PageReaderV2.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.v2.read.reader.page;
-
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.encoding.decoder.PlainDecoder;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.utils.Binary;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class PageReaderV2 extends PageReader {
-
- public PageReaderV2(
- ByteBuffer pageData,
- TSDataType dataType,
- Decoder valueDecoder,
- Decoder timeDecoder,
- Filter filter) {
- this(null, pageData, dataType, valueDecoder, timeDecoder, filter);
- }
-
- public PageReaderV2(
- PageHeader pageHeader,
- ByteBuffer pageData,
- TSDataType dataType,
- Decoder valueDecoder,
- Decoder timeDecoder,
- Filter filter) {
- super(pageHeader, pageData, dataType, valueDecoder, timeDecoder, filter);
- }
-
- /** @return the returned BatchData may be empty, but never be null */
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- @Override
- public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
-
- if (dataType != TSDataType.INT32 && dataType != TSDataType.TEXT) {
- return super.getAllSatisfiedPageData(ascending);
- }
-
- BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending,
false);
-
- while (timeDecoder.hasNext(timeBuffer)) {
- long timestamp = timeDecoder.readLong(timeBuffer);
- switch (dataType) {
- case INT32:
- int anInt =
- (valueDecoder instanceof PlainDecoder)
- ? valueBuffer.getInt()
- : valueDecoder.readInt(valueBuffer);
- if (!isDeleted(timestamp) && (filter == null ||
filter.satisfy(timestamp, anInt))) {
- pageData.putInt(timestamp, anInt);
- }
- break;
- case TEXT:
- int length = valueBuffer.getInt();
- byte[] buf = new byte[length];
- valueBuffer.get(buf, 0, buf.length);
- Binary aBinary = new Binary(buf);
- if (!isDeleted(timestamp) && (filter == null ||
filter.satisfy(timestamp, aBinary))) {
- pageData.putBinary(timestamp, aBinary);
- }
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
- }
- }
- return pageData.flip();
- }
-}