This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 3402f6f7d37 add nnenory calculation for insertRow
3402f6f7d37 is described below
commit 3402f6f7d37c7561745e2b32bea723c1e90882f8
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 8 14:32:48 2024 +0800
add nnenory calculation for insertRow
---
.../db/storageengine/dataregion/DataRegion.java | 277 ++++++++++++++-------
.../dataregion/memtable/AbstractMemTable.java | 2 +-
.../dataregion/memtable/IMemTable.java | 2 +-
.../dataregion/memtable/TsFileProcessor.java | 205 +++++++++------
4 files changed, 316 insertions(+), 170 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index cf751e63f44..08034b5022e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
@@ -214,13 +215,19 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
- /** Condition to safely delete data region. */
+ /**
+ * Condition to safely delete data region.
+ */
private final Condition deletedCondition =
insertLock.writeLock().newCondition();
- /** Data region has been deleted or not. */
+ /**
+ * Data region has been deleted or not.
+ */
private volatile boolean deleted = false;
- /** closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done. */
+ /**
+ * closeStorageGroupCondition is used to wait for all currently closing
TsFiles to be done.
+ */
private final Object closeStorageGroupCondition = new Object();
/**
@@ -228,32 +235,50 @@ public class DataRegion implements IDataRegionForQuery {
*/
private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
- /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
+ /**
+ * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
+ */
private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors =
new TreeMap<>();
- /** time partition id in the database -> {@link TsFileProcessor} for this
time partition. */
+ /**
+ * time partition id in the database -> {@link TsFileProcessor} for this
time partition.
+ */
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors
= new TreeMap<>();
- /** sequence {@link TsFileProcessor}s which are closing. */
+ /**
+ * sequence {@link TsFileProcessor}s which are closing.
+ */
private final Set<TsFileProcessor> closingSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /** unsequence {@link TsFileProcessor}s which are closing. */
+ /**
+ * unsequence {@link TsFileProcessor}s which are closing.
+ */
private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor =
ConcurrentHashMap.newKeySet();
- /** data region id. */
+ /**
+ * data region id.
+ */
private final String dataRegionId;
- /** database name. */
+ /**
+ * database name.
+ */
private final String databaseName;
- /** database system directory. */
+ /**
+ * database system directory.
+ */
private File storageGroupSysDir;
- /** manage seqFileList and unSeqFileList. */
+ /**
+ * manage seqFileList and unSeqFileList.
+ */
private final TsFileManager tsFileManager;
- /** manage tsFileResource degrade. */
+ /**
+ * manage tsFileResource degrade.
+ */
private final TsFileResourceManager tsFileResourceManager =
TsFileResourceManager.getInstance();
/**
@@ -264,10 +289,14 @@ public class DataRegion implements IDataRegionForQuery {
private final HashMap<Long, VersionController>
timePartitionIdVersionControllerMap =
new HashMap<>();
- /** file system factory (local or hdfs). */
+ /**
+ * file system factory (local or hdfs).
+ */
private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
- /** File flush policy. */
+ /**
+ * File flush policy.
+ */
private TsFileFlushPolicy fileFlushPolicy;
/**
@@ -278,16 +307,24 @@ public class DataRegion implements IDataRegionForQuery {
*/
private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
- /** database info for mem control. */
+ /**
+ * database info for mem control.
+ */
private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
- /** whether it's ready from recovery. */
+ /**
+ * whether it's ready from recovery.
+ */
private boolean isReady = false;
- /** close file listeners. */
+ /**
+ * close file listeners.
+ */
private List<CloseFileListener> customCloseFileListeners =
Collections.emptyList();
- /** flush listeners. */
+ /**
+ * flush listeners.
+ */
private List<FlushListener> customFlushListeners = Collections.emptyList();
private ILastFlushTimeMap lastFlushTimeMap;
@@ -311,10 +348,10 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Construct a database processor.
*
- * @param systemDir system dir path
- * @param dataRegionId data region id e.g. 1
+ * @param systemDir system dir path
+ * @param dataRegionId data region id e.g. 1
* @param fileFlushPolicy file flush policy
- * @param databaseName database name e.g. root.sg1
+ * @param databaseName database name e.g. root.sg1
*/
public DataRegion(
String systemDir, String dataRegionId, TsFileFlushPolicy
fileFlushPolicy, String databaseName)
@@ -397,19 +434,29 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
- /** this class is used to store recovering context. */
+ /**
+ * this class is used to store recovering context.
+ */
private class DataRegionRecoveryContext {
- /** number of files to be recovered. */
+ /**
+ * number of files to be recovered.
+ */
private final long numOfFilesToRecover;
- /** number of already recovered files. */
+ /**
+ * number of already recovered files.
+ */
private long recoveredFilesNum;
- /** last recovery log time. */
+ /**
+ * last recovery log time.
+ */
private long lastLogTime;
- /** recover performers of unsealed TsFiles. */
+ /**
+ * recover performers of unsealed TsFiles.
+ */
private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new
ArrayList<>();
public DataRegionRecoveryContext(long numOfFilesToRecover) {
@@ -441,7 +488,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** recover from file */
+ /**
+ * recover from file
+ */
@SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
private void recover() throws DataRegionException {
try {
@@ -711,7 +760,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** check if the tsfile's time is smaller than system current time. */
+ /**
+ * check if the tsfile's time is smaller than system current time.
+ */
private void checkTsFileTime(File tsFile, long currentTime) throws
DataRegionException {
String[] items = tsFile.getName().replace(TSFILE_SUFFIX,
"").split(FILE_NAME_SEPARATOR);
long fileTime = Long.parseLong(items[0]);
@@ -724,7 +775,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** submit unsealed TsFile to WALRecoverManager. */
+ /**
+ * submit unsealed TsFile to WALRecoverManager.
+ */
private WALRecoverListener recoverUnsealedTsFile(
TsFileResource unsealedTsFile, DataRegionRecoveryContext context,
boolean isSeq) {
UnsealedTsFileRecoverPerformer recoverPerformer =
@@ -809,7 +862,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** recover sealed TsFile. */
+ /**
+ * recover sealed TsFile.
+ */
private void recoverSealedTsFiles(
TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean
isSeq) {
try (SealedTsFileRecoverPerformer recoverPerformer =
@@ -900,7 +955,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
// insert to sequence or unSequence file
TsFileProcessor tsFileProcessor =
@@ -945,13 +1000,13 @@ public class DataRegion implements IDataRegionForQuery {
// a new partition, insert the remaining of the previous partition
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode,
- before,
- loc,
- isSequence,
- results,
- beforeTimePartition,
- noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
before = loc;
beforeTimePartition = timePartitionId;
@@ -961,13 +1016,13 @@ public class DataRegion implements IDataRegionForQuery {
// insert previous range into unsequence
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode,
- before,
- loc,
- isSequence,
- results,
- beforeTimePartition,
- noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
before = loc;
isSequence = true;
@@ -980,13 +1035,13 @@ public class DataRegion implements IDataRegionForQuery {
if (before < loc) {
noFailure =
insertTabletToTsFileProcessor(
- insertTabletNode,
- before,
- loc,
- isSequence,
- results,
- beforeTimePartition,
- noFailure)
+ insertTabletNode,
+ before,
+ loc,
+ isSequence,
+ results,
+ beforeTimePartition,
+ noFailure)
&& noFailure;
}
@@ -1096,11 +1151,11 @@ public class DataRegion implements IDataRegionForQuery {
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5,
null, 5}
*
* @param insertTabletNode insert a tablet of a device
- * @param sequence whether is sequence
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
- * @param timePartitionId time partition id
+ * @param sequence whether is sequence
+ * @param start start index of rows to be inserted in
insertTabletPlan
+ * @param end end index of rows to be inserted in
insertTabletPlan
+ * @param results result array
+ * @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true
otherwise
*/
private boolean insertTabletToTsFileProcessor(
@@ -1161,7 +1216,7 @@ public class DataRegion implements IDataRegionForQuery {
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
// disable updating last cache on follower
return;
}
@@ -1185,7 +1240,8 @@ public class DataRegion implements IDataRegionForQuery {
node.getMeasurementSchemas(),
node.isAligned(),
node::composeLastTimeValuePair,
- index -> node.getColumns()[index] != null,
+ index -> node.getColumns()[index] != null &&
(node.getColumnCategories() == null
+ || node.getColumnCategories()[index] ==
TsTableColumnCategory.MEASUREMENT),
true,
latestFlushedTime);
}
@@ -1238,7 +1294,8 @@ public class DataRegion implements IDataRegionForQuery {
node.getMeasurementSchemas(),
node.isAligned(),
node::composeTimeValuePair,
- index -> node.getValues()[index] != null,
+ index -> node.getValues()[index] != null &&
(node.getColumnCategories() == null
+ || node.getColumnCategories()[index] ==
TsTableColumnCategory.MEASUREMENT),
true,
latestFlushedTime);
}
@@ -1442,9 +1499,9 @@ public class DataRegion implements IDataRegionForQuery {
/**
* get processor from hashmap, flush oldest processor if necessary
*
- * @param timeRangeId time partition range
+ * @param timeRangeId time partition range
* @param tsFileProcessorTreeMap tsFileProcessorTreeMap
- * @param sequence whether is sequence or not
+ * @param sequence whether is sequence or not
*/
private TsFileProcessor getOrCreateTsFileProcessorIntern(
long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
boolean sequence)
@@ -1528,7 +1585,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
@@ -1560,7 +1617,7 @@ public class DataRegion implements IDataRegionForQuery {
/**
* close one tsfile processor, thread-safety should be ensured by caller
*
- * @param sequence whether this tsfile processor is sequence or not
+ * @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence,
TsFileProcessor tsFileProcessor) {
@@ -1625,7 +1682,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** close all tsfile resource */
+ /**
+ * close all tsfile resource
+ */
public void closeAllResources() {
for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
try {
@@ -1643,7 +1702,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** delete tsfile */
+ /**
+ * delete tsfile
+ */
public void syncDeleteDataFiles() throws TsFileProcessorException {
logger.info(
"{} will close all files for deleting data files", databaseName + "-"
+ dataRegionId);
@@ -1753,7 +1814,9 @@ public class DataRegion implements IDataRegionForQuery {
WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId,
count);
}
- /** This method will be blocked until all tsfile processors are closed. */
+ /**
+ * This method will be blocked until all tsfile processors are closed.
+ */
public void syncCloseAllWorkingTsFileProcessors() {
try {
List<Future<?>> tsFileProcessorsClosingFutures =
asyncCloseAllWorkingTsFileProcessors();
@@ -1792,7 +1855,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** close all working tsfile processors */
+ /**
+ * close all working tsfile processors
+ */
List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");
List<Future<?>> futures = new ArrayList<>();
@@ -1814,7 +1879,9 @@ public class DataRegion implements IDataRegionForQuery {
return futures;
}
- /** force close all working tsfile processors */
+ /**
+ * force close all working tsfile processors
+ */
public void forceCloseAllWorkingTsFileProcessors() throws
TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
@@ -1835,7 +1902,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** used for queryengine */
+ /**
+ * used for queryengine
+ */
@Override
public QueryDataSource query(
List<IFullPath> pathList,
@@ -1999,7 +2068,9 @@ public class DataRegion implements IDataRegionForQuery {
return fileScanHandles;
}
- /** lock the read lock of the insert lock */
+ /**
+ * lock the read lock of the insert lock
+ */
@Override
public void readLock() {
// apply read lock for SG insert lock to prevent inconsistent with
concurrently writing memtable
@@ -2008,20 +2079,26 @@ public class DataRegion implements IDataRegionForQuery {
tsFileManager.readLock();
}
- /** unlock the read lock of insert lock */
+ /**
+ * unlock the read lock of insert lock
+ */
@Override
public void readUnlock() {
tsFileManager.readUnlock();
insertLock.readLock().unlock();
}
- /** lock the write lock of the insert lock */
+ /**
+ * lock the write lock of the insert lock
+ */
public void writeLock(String holder) {
insertLock.writeLock().lock();
insertWriteLockHolder = holder;
}
- /** unlock the write lock of the insert lock */
+ /**
+ * unlock the write lock of the insert lock
+ */
public void writeUnlock() {
insertWriteLockHolder = "";
insertLock.writeLock().unlock();
@@ -2071,7 +2148,9 @@ public class DataRegion implements IDataRegionForQuery {
return tsfileResourcesForQuery;
}
- /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+ /**
+ * Seperate tsfiles in TsFileManager to sealedList and unsealedList.
+ */
private void separateTsFile(
List<TsFileResource> sealedResource,
List<TsFileResource> unsealedResource,
@@ -2501,7 +2580,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** Put the memtable back to the MemTablePool and make the metadata in
writer visible */
+ /**
+ * Put the memtable back to the MemTablePool and make the metadata in writer
visible
+ */
// TODO please consider concurrency with read and insert method.
private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor
tsFileProcessor)
throws TsFileProcessorException {
@@ -2597,7 +2678,9 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
- /** Schedule settle compaction for ttl check. */
+ /**
+ * Schedule settle compaction for ttl check.
+ */
public int executeTTLCheck() throws InterruptedException {
while (!isCompactionSelecting.compareAndSet(false, true)) {
// wait until success
@@ -2726,7 +2809,9 @@ public class DataRegion implements IDataRegionForQuery {
return getNonSystemDatabaseName(databaseName);
}
- /** Merge file under this database processor */
+ /**
+ * Merge file under this database processor
+ */
public int compact() {
writeLock("merge");
CompactionScheduler.exclusiveLockCompactionSelection();
@@ -2746,7 +2831,7 @@ public class DataRegion implements IDataRegionForQuery {
* <p>Then, update the latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
*
* @param newTsFileResource tsfile resource @UsedBy load external tsfile
module
- * @param deleteOriginFile whether to delete origin tsfile
+ * @param deleteOriginFile whether to delete origin tsfile
* @param isGeneratedByPipe whether the load tsfile request is generated by
pipe
*/
public void loadNewTsFile(
@@ -2932,8 +3017,8 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * Update latest time in latestTimeForEachDevice and
- * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load
external tsfile module.
+ * Update latest time in latestTimeForEachDevice and
partitionLatestFlushedTimeForEachDevice.
+ * @UsedBy sync module, load external tsfile module.
*/
protected void updateLastFlushTime(TsFileResource newTsFileResource) {
for (IDeviceID device : newTsFileResource.getDevices()) {
@@ -2951,8 +3036,8 @@ public class DataRegion implements IDataRegionForQuery {
/**
* Execute the loading process by the type.
*
- * @param tsFileResource tsfile resource to be loaded
- * @param filePartitionId the partition id of the new file
+ * @param tsFileResource tsfile resource to be loaded
+ * @param filePartitionId the partition id of the new file
* @param deleteOriginFile whether to delete the original file
* @return load the file successfully @UsedBy sync module, load external
tsfile module.
*/
@@ -3195,14 +3280,14 @@ public class DataRegion implements IDataRegionForQuery {
* "tsFileResource" have the same plan indexes as the local one.
*
* @return true if any file contains plans with indexes no less than the max
plan index of
- * "tsFileResource", otherwise false.
+ * "tsFileResource", otherwise false.
*/
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
// examine working processor first as they have the largest plan index
return isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
|| isFileAlreadyExistInWorking(
- tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+ tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getSequenceFileList())
|| isFileAlreadyExistInClosed(tsFileResource, partitionNum,
getUnSequenceFileList());
}
@@ -3326,7 +3411,7 @@ public class DataRegion implements IDataRegionForQuery {
boolean isSequence =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(timePartitionId,
insertRowNode.getDeviceID());
TsFileProcessor tsFileProcessor =
getOrCreateTsFileProcessor(timePartitionId, isSequence);
if (tsFileProcessor == null) {
continue;
@@ -3435,8 +3520,8 @@ public class DataRegion implements IDataRegionForQuery {
areSequence[i] =
config.isEnableSeparateData()
&& insertRowNode.getTime()
- > lastFlushTimeMap.getFlushedTime(
- timePartitionIds[i], insertRowNode.getDeviceID());
+ > lastFlushTimeMap.getFlushedTime(
+ timePartitionIds[i], insertRowNode.getDeviceID());
}
insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
if (!insertRowsNode.getResults().isEmpty()) {
@@ -3500,7 +3585,7 @@ public class DataRegion implements IDataRegionForQuery {
}
/**
- * @param folder the folder's path
+ * @param folder the folder's path
* @param diskSize the disk space occupied by this folder, unit is MB
*/
private void countFolderDiskSize(String folder, AtomicLong diskSize) {
@@ -3610,7 +3695,9 @@ public class DataRegion implements IDataRegionForQuery {
return insertWriteLockHolder;
}
- /** This method could only be used in iot consensus */
+ /**
+ * This method could only be used in iot consensus
+ */
public IWALNode getWALNode() {
if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
throw new UnsupportedOperationException();
@@ -3620,7 +3707,9 @@ public class DataRegion implements IDataRegionForQuery {
.applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
}
- /** Wait for this data region successfully deleted */
+ /**
+ * Wait for this data region successfully deleted
+ */
public void waitForDeleted() {
writeLock("waitForDeleted");
try {
@@ -3636,7 +3725,9 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- /** Release all threads waiting for this data region successfully deleted */
+ /**
+ * Release all threads waiting for this data region successfully deleted
+ */
public void markDeleted() {
writeLock("markDeleted");
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index cd6d5aa7c20..7ecd348a2ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -480,7 +480,7 @@ public abstract class AbstractMemTable implements IMemTable
{
}
@Override
- public boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String
measurement) {
+ public boolean chunkNotExist(IDeviceID deviceId, String measurement) {
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
if (null == memChunkGroup) {
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index be5febcb0e6..4de1520a22b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -177,7 +177,7 @@ public interface IMemTable extends WALEntryValue {
void release();
/** must guarantee the device exists in the work memtable only used when mem
control enabled */
- boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement);
+ boolean chunkNotExist(IDeviceID deviceId, String measurement);
/** only used when mem control enabled */
long getCurrentTVListSize(IDeviceID deviceId, String measurement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c16f03424a6..2e060319700 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -118,36 +118,56 @@ import static
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK
@SuppressWarnings("java:S1135") // ignore todos
public class TsFileProcessor {
- /** Logger fot this class. */
+ /**
+ * Logger fot this class.
+ */
private static final Logger logger =
LoggerFactory.getLogger(TsFileProcessor.class);
private static final int NUM_MEM_TO_ESTIMATE = 3;
- /** Storage group name of this tsfile. */
+ /**
+ * Storage group name of this tsfile.
+ */
private final String storageGroupName;
- /** IoTDB config. */
+ /**
+ * IoTDB config.
+ */
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- /** Database info for mem control. */
+ /**
+ * Database info for mem control.
+ */
private final DataRegionInfo dataRegionInfo;
- /** Tsfile processor info for mem control. */
+ /**
+ * Tsfile processor info for mem control.
+ */
private TsFileProcessorInfo tsFileProcessorInfo;
- /** Sync this object in read() and asyncTryToFlush(). */
+ /**
+ * Sync this object in read() and asyncTryToFlush().
+ */
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new
ConcurrentLinkedDeque<>();
- /** Modification to memtable mapping. */
+ /**
+ * Modification to memtable mapping.
+ */
private final List<Pair<Modification, IMemTable>> modsToMemtable = new
ArrayList<>();
- /** Writer for restore tsfile and flushing. */
+ /**
+ * Writer for restore tsfile and flushing.
+ */
private RestorableTsFileIOWriter writer;
- /** Tsfile resource for index this tsfile. */
+ /**
+ * Tsfile resource for index this tsfile.
+ */
private final TsFileResource tsFileResource;
- /** Time range index to indicate this processor belongs to which time range
*/
+ /**
+ * Time range index to indicate this processor belongs to which time range
+ */
private long timeRangeId;
/**
@@ -155,7 +175,9 @@ public class TsFileProcessor {
*/
private volatile boolean managedByFlushManager;
- /** A lock to mutual exclude read and read */
+ /**
+ * A lock to mutual exclude read and read
+ */
private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
@@ -164,32 +186,48 @@ public class TsFileProcessor {
*/
private volatile boolean shouldClose;
- /** Working memtable. */
+ /**
+ * Working memtable.
+ */
private IMemTable workMemTable;
- /** Last flush time to flush the working memtable. */
+ /**
+ * Last flush time to flush the working memtable.
+ */
private long lastWorkMemtableFlushTime;
- /** This callback is called before the workMemtable is added into the
flushingMemTables. */
+ /**
+ * This callback is called before the workMemtable is added into the
flushingMemTables.
+ */
private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
- /** Wal node. */
+ /**
+ * Wal node.
+ */
private final IWALNode walNode;
- /** Whether it's a sequence file or not. */
+ /**
+ * Whether it's a sequence file or not.
+ */
private final boolean sequence;
- /** Total memtable size for mem control. */
+ /**
+ * Total memtable size for mem control.
+ */
private long totalMemTableSize;
private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get
flushQueryLock write lock";
private static final String FLUSH_QUERY_WRITE_RELEASE =
"{}: {} get flushQueryLock write lock released";
- /** Close file listener. */
+ /**
+ * Close file listener.
+ */
private final List<CloseFileListener> closeFileListeners = new
CopyOnWriteArrayList<>();
- /** Flush file listener. */
+ /**
+ * Flush file listener.
+ */
private final List<FlushListener> flushListeners = new ArrayList<>();
private final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
@@ -278,7 +316,8 @@ public class TsFileProcessor {
memIncrements =
checkAlignedMemCostAndAddToTspInfoForRow(
insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
- insertRowNode.getDataTypes(), insertRowNode.getValues());
+ insertRowNode.getDataTypes(), insertRowNode.getValues(),
+ insertRowNode.getColumnCategories());
} else {
memIncrements =
checkMemCostAndAddToTspInfoForRow(
@@ -487,9 +526,9 @@ public class TsFileProcessor {
* non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
*
* @param insertTabletNode insert a tablet of a device
- * @param start start index of rows to be inserted in insertTabletPlan
- * @param end end index of rows to be inserted in insertTabletPlan
- * @param results result array
+ * @param start start index of rows to be inserted in
insertTabletPlan
+ * @param end end index of rows to be inserted in
insertTabletPlan
+ * @param results result array
*/
public void insertTablet(
InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results, boolean noFailure)
@@ -590,7 +629,7 @@ public class TsFileProcessor {
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])) {
+ if (workMemTable.chunkNotExist(deviceId, measurements[i])) {
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
@@ -608,7 +647,7 @@ public class TsFileProcessor {
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
- return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ return new long[]{memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
@SuppressWarnings("squid:S3776") // High Cognitive Complexity
@@ -630,9 +669,9 @@ public class TsFileProcessor {
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+ if (workMemTable.chunkNotExist(deviceId, measurements[i])
&& (!increasingMemTableInfo.containsKey(deviceId)
- ||
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
+ ||
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
@@ -659,35 +698,26 @@ public class TsFileProcessor {
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
- return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ return new long[]{memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
private long[] checkAlignedMemCostAndAddToTspInfoForRow(
- IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values)
+ IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes,
Object[] values,
+ TsTableColumnCategory[] columnCategories)
throws WriteProcessException {
// Memory of increased PrimitiveArray and TEXT values, e.g., add a
long[128], add 128*8
long memTableIncrement = 0L;
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
- if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)) {
+ if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
// For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER,
TSDataType.VECTOR)
* dataTypes.length;
- memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes,
null);
- for (int i = 0; i < dataTypes.length; i++) {
- // Skip failed Measurements
- if (dataTypes[i] == null || measurements[i] == null) {
- continue;
- }
- // TEXT data mem size
- if (dataTypes[i].isBinary() && values[i] != null) {
- textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
- }
- }
+ memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes,
columnCategories);
} else {
// For existed device of this mem table
AlignedWritableMemChunk alignedMemChunk =
@@ -696,30 +726,35 @@ public class TsFileProcessor {
List<TSDataType> dataTypesInTVList = new ArrayList<>();
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
- if (dataTypes[i] == null || measurements[i] == null) {
+ if (dataTypes[i] == null || measurements[i] == null ||
(columnCategories != null
+ && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
continue;
}
- // Extending the column of aligned mem chunk
+ // add arrays for new columns
if (!alignedMemChunk.containsMeasurement(measurements[i])) {
+ int currentArrayNum = alignedMemChunk.alignedListSize() /
PrimitiveArrayManager.ARRAY_SIZE +
+ alignedMemChunk.alignedListSize() %
PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
memTableIncrement +=
- (alignedMemChunk.alignedListSize() /
PrimitiveArrayManager.ARRAY_SIZE + 1)
- * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+ currentArrayNum *
AlignedTVList.valueListArrayMemCost(dataTypes[i]);
dataTypesInTVList.add(dataTypes[i]);
}
- // TEXT data mem size
- if (dataTypes[i].isBinary() && values[i] != null) {
- textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
- }
}
- // Here currentChunkPointNum >= 1
+ // this insertion will result in a new array
if ((alignedMemChunk.alignedListSize() %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
dataTypesInTVList.addAll(((AlignedTVList)
alignedMemChunk.getTVList()).getTsDataTypes());
memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
}
}
+
+ for (int i = 0; i < dataTypes.length; i++) {
+ // TEXT data mem size
+ if (dataTypes[i] != null && dataTypes[i].isBinary() && values[i] !=
null) {
+ textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+ }
+ }
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
- return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ return new long[]{memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@@ -736,7 +771,7 @@ public class TsFileProcessor {
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();
- if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)
+ if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
&& !increasingMemTableInfo.containsKey(deviceId)) {
// For new device of this mem table
// ChunkMetadataIncrement
@@ -803,7 +838,7 @@ public class TsFileProcessor {
}
}
updateMemoryInfo(memTableIncrement, chunkMetadataIncrement,
textDataIncrement);
- return new long[] {memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
+ return new long[]{memTableIncrement, textDataIncrement,
chunkMetadataIncrement};
}
private long[] checkMemCostAndAddToTspInfoForTablet(
@@ -815,7 +850,7 @@ public class TsFileProcessor {
int end)
throws WriteProcessException {
if (start >= end) {
- return new long[] {0, 0, 0};
+ return new long[]{0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
@@ -845,7 +880,7 @@ public class TsFileProcessor {
TSStatus[] results)
throws WriteProcessException {
if (start >= end) {
- return new long[] {0, 0, 0};
+ return new long[]{0, 0, 0};
}
long[] memIncrements = new long[3]; // memTable, text, chunk metadata
@@ -877,7 +912,7 @@ public class TsFileProcessor {
Object column) {
// memIncrements = [memTable, text, chunk metadata] respectively
- if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurement)) {
+ if (workMemTable.chunkNotExist(deviceId, measurement)) {
// ChunkMetadataIncrement
memIncrements[2] += ChunkMetadata.calculateRamSize(measurement,
dataType);
memIncrements[0] +=
@@ -940,7 +975,7 @@ public class TsFileProcessor {
}
// memIncrements = [memTable, text, chunk metadata] respectively
- if (workMemTable.checkIfChunkDoesNotExist(deviceId,
AlignedPath.VECTOR_PLACEHOLDER)) {
+ if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
// new devices introduce new ChunkMetadata
// ChunkMetadata memory Increment
memIncrements[2] +=
@@ -950,8 +985,8 @@ public class TsFileProcessor {
int numArraysToAdd =
incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
- + incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE
- > 0
+ + incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
? 1
: 0;
memIncrements[0] +=
@@ -971,7 +1006,7 @@ public class TsFileProcessor {
|| column == null
|| measurement == null
|| (columnCategories != null
- && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
+ && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
continue;
}
@@ -987,14 +1022,14 @@ public class TsFileProcessor {
// calculate how many new arrays will be added after this insertion
int currentArrayCnt =
currentPointNum / PrimitiveArrayManager.ARRAY_SIZE
- + currentPointNum % PrimitiveArrayManager.ARRAY_SIZE
- > 0
+ + currentPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
? 1
: 0;
int newArrayCnt =
newPointNum / PrimitiveArrayManager.ARRAY_SIZE
- + newPointNum % PrimitiveArrayManager.ARRAY_SIZE
- > 0
+ + newPointNum % PrimitiveArrayManager.ARRAY_SIZE
+ > 0
? 1
: 0;
long acquireArray = newArrayCnt - currentArrayCnt;
@@ -1016,7 +1051,7 @@ public class TsFileProcessor {
|| column == null
|| measurement == null
|| (columnCategories != null
- && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
+ && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
continue;
}
@@ -1168,7 +1203,9 @@ public class TsFileProcessor {
logger.info("File {} is closed synchronously",
tsFileResource.getTsFile().getAbsolutePath());
}
- /** async close one tsfile, register and close it by another thread */
+ /**
+ * async close one tsfile, register and close it by another thread
+ */
public Future<?> asyncClose() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1294,7 +1331,9 @@ public class TsFileProcessor {
}
}
- /** Put the working memtable into flushing list and set the working memtable
to null */
+ /**
+ * Put the working memtable into flushing list and set the working memtable
to null
+ */
public void asyncFlush() {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1372,7 +1411,9 @@ public class TsFileProcessor {
return FlushManager.getInstance().registerTsFileProcessor(this);
}
- /** Put back the memtable to MemTablePool and make metadata in writer
visible */
+ /**
+ * Put back the memtable to MemTablePool and make metadata in writer visible
+ */
private void releaseFlushedMemTable(IMemTable memTable) {
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
@@ -1429,7 +1470,9 @@ public class TsFileProcessor {
}
}
- /** This method will synchronize the memTable and release its flushing
resources */
+ /**
+ * This method will synchronize the memTable and release its flushing
resources
+ */
private void syncReleaseFlushedMemTable(IMemTable memTable) {
synchronized (flushingMemTables) {
releaseFlushedMemTable(memTable);
@@ -1664,7 +1707,9 @@ public class TsFileProcessor {
}
}
- /** end file and write some meta */
+ /**
+ * end file and write some meta
+ */
private void endFile() throws IOException, TsFileProcessorException {
if (logger.isDebugEnabled()) {
logger.debug("Start to end file {}", tsFileResource);
@@ -1686,7 +1731,9 @@ public class TsFileProcessor {
writer = null;
}
- /** End empty file and remove it from file system */
+ /**
+ * End empty file and remove it from file system
+ */
private void endEmptyFile() throws TsFileProcessorException, IOException {
logger.info("Start to end empty file {}", tsFileResource);
@@ -1714,7 +1761,9 @@ public class TsFileProcessor {
this.managedByFlushManager = managedByFlushManager;
}
- /** Close this tsfile */
+ /**
+ * Close this tsfile
+ */
public void close() throws TsFileProcessorException {
try {
// When closing resource file, its corresponding mod file is also closed.
@@ -2166,7 +2215,9 @@ public class TsFileProcessor {
this.timeRangeId = timeRangeId;
}
- /** Release resource of a memtable */
+ /**
+ * Release resource of a memtable
+ */
public void putMemTableBackAndClose() throws TsFileProcessorException {
if (workMemTable != null) {
workMemTable.release();
@@ -2190,12 +2241,16 @@ public class TsFileProcessor {
return workMemTable != null ? workMemTable.getTVListsRamCost() : 0;
}
- /** Return Long.MAX_VALUE if workMemTable is null */
+ /**
+ * Return Long.MAX_VALUE if workMemTable is null
+ */
public long getWorkMemTableCreatedTime() {
return workMemTable != null ? workMemTable.getCreatedTime() :
Long.MAX_VALUE;
}
- /** Return Long.MAX_VALUE if workMemTable is null */
+ /**
+ * Return Long.MAX_VALUE if workMemTable is null
+ */
public long getWorkMemTableUpdateTime() {
return workMemTable != null ? workMemTable.getUpdateTime() :
Long.MAX_VALUE;
}