This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch storage_engine_sonar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b0856ae8ff025a3ff6087f7d28774631f5c1d8ca Author: HTHou <[email protected]> AuthorDate: Sun Jun 25 15:35:55 2023 +0800 fix storage engine sonar bug --- .../iotdb/db/engine/storagegroup/DataRegion.java | 77 ++-------------------- .../db/engine/storagegroup/TsFileManager.java | 21 ------ .../db/engine/storagegroup/TsFileProcessor.java | 3 +- .../db/engine/storagegroup/TsFileResource.java | 14 ++-- .../db/engine/storagegroup/TsFileResourceList.java | 7 ++ .../db/engine/storagegroup/DataRegionTest.java | 30 --------- 6 files changed, 23 insertions(+), 129 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 468619f8a69..7962e3e635f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -456,7 +456,7 @@ public class DataRegion implements IDataRegionForQuery { // split by partition so that we can find the last file of each partition and decide to // close it or not DataRegionRecoveryContext dataRegionRecoveryContext = - new DataRegionRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); + new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = splitResourcesByPartition(tmpSeqTsFiles); Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = @@ -745,15 +745,15 @@ public class DataRegion implements IDataRegionForQuery { return new Pair<>(ret, upgradeRet); } - private void continueFailedRenames(File fileFolder, String suffix) { + private void continueFailedRenames(File fileFolder, String suffix) throws IOException { File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix); if (files != null) { for (File tempResource : files) { File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, "")); if (originResource.exists()) { - tempResource.delete(); + Files.delete(tempResource.toPath()); } else { - tempResource.renameTo(originResource); + Files.move(tempResource.toPath(), originResource.toPath()); } } } @@ -2842,31 +2842,6 @@ public class DataRegion implements IDataRegionForQuery { return false; } - /** remove all partitions that satisfy a filter. */ - public void removePartitions(TimePartitionFilter filter) { - // this requires blocking all other activities - writeLock("removePartitions"); - try { - // abort ongoing compaction - abortCompaction(); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // Wait two seconds for the compaction thread to terminate - } - // close all working files that should be removed - removePartitions(filter, workSequenceTsFileProcessors.entrySet(), true); - removePartitions(filter, workUnsequenceTsFileProcessors.entrySet(), false); - - // remove data files - removePartitions(filter, tsFileManager.getIterator(true), true); - removePartitions(filter, tsFileManager.getIterator(false), false); - - } finally { - writeUnlock(); - } - } - public void abortCompaction() { tsFileManager.setAllowCompaction(false); List<AbstractCompactionTask> runningTasks = @@ -2876,49 +2851,7 @@ public class DataRegion implements IDataRegionForQuery { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { logger.error("Thread get interrupted when waiting compaction to finish", e); - break; - } - } - } - - // may remove the processorEntrys - private void removePartitions( - TimePartitionFilter filter, - Set<Entry<Long, TsFileProcessor>> processorEntrys, - boolean sequence) { - for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator(); - iterator.hasNext(); ) { - Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next(); - long partitionId = longTsFileProcessorEntry.getKey(); - lastFlushTimeMap.removePartition(partitionId); - TimePartitionManager.getInstance() - .removePartition(new DataRegionId(Integer.valueOf(dataRegionId)), partitionId); - TsFileProcessor processor = longTsFileProcessorEntry.getValue(); - if (filter.satisfy(databaseName, partitionId)) { - processor.syncClose(); - iterator.remove(); - processor.getTsFileResource().remove(); - tsFileManager.remove(processor.getTsFileResource(), sequence); - logger.debug( - "{} is removed during deleting partitions", - processor.getTsFileResource().getTsFilePath()); - } - } - } - - // may remove the iterator's data - private void removePartitions( - TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean sequence) { - while (iterator.hasNext()) { - TsFileResource tsFileResource = iterator.next(); - if (filter.satisfy(databaseName, tsFileResource.getTimePartition())) { - tsFileResource.remove(); - tsFileManager.remove(tsFileResource, sequence); - lastFlushTimeMap.removePartition(tsFileResource.getTimePartition()); - TimePartitionManager.getInstance() - .removePartition( - new DataRegionId(Integer.valueOf(dataRegionId)), tsFileResource.getTimePartition()); - logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath()); + Thread.currentThread().interrupt(); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java index 7c43b8e3f5e..953ff7141a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.engine.storagegroup; -import org.apache.iotdb.db.exception.WriteLockFailedException; import org.apache.iotdb.db.rescon.TsFileResourceManager; import org.slf4j.Logger; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -301,25 +299,6 @@ public class TsFileManager { writeLockHolder = holder; } - /** - * Acquire write lock with timeout, {@link WriteLockFailedException} will be thrown after timeout. - * The unit of timeout is ms. - */ - public void writeLockWithTimeout(String holder, long timeout) throws WriteLockFailedException { - try { - if (resourceListLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { - writeLockHolder = holder; - } else { - throw new WriteLockFailedException( - String.format("cannot get write lock in %d ms", timeout)); - } - } catch (InterruptedException e) { - LOGGER.warn(e.getMessage(), e); - Thread.interrupted(); - throw new WriteLockFailedException("thread is interrupted"); - } - } - public void writeUnlock() { resourceListLock.writeLock().unlock(); writeLockHolder = ""; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 8d499872195..b8c87cd5f43 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -1078,6 +1078,7 @@ public class TsFileProcessor { } /** This method will synchronize the memTable and release its flushing resources */ + @SuppressWarnings("squid:S2445") private void syncReleaseFlushedMemTable(IMemTable memTable) { synchronized (memTable) { releaseFlushedMemTable(memTable); @@ -1097,7 +1098,7 @@ public class TsFileProcessor { * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of * the flush manager pool */ - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + @SuppressWarnings({"squid:S3776", "squid:S2142"}) // Suppress high Cognitive Complexity warning public void flushOneMemTable() { IMemTable memTableToFlush = flushingMemTables.getFirst(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index bdb93bbd165..2d7d62d9e4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -69,6 +69,7 @@ import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; @@ -101,8 +102,10 @@ public class TsFileResource { /** time index */ protected ITimeIndex timeIndex; + @SuppressWarnings("squid:S3077") private volatile ModificationFile modFile; + @SuppressWarnings("squid:S3077") private volatile ModificationFile compactionModFile; protected AtomicReference<TsFileResourceStatus> atomicStatus = @@ -136,7 +139,7 @@ public class TsFileResource { private long ramSize; - private volatile int tierLevel = 0; + private AtomicInteger tierLevel; private volatile long tsFileSize = -1L; @@ -193,7 +196,7 @@ public class TsFileResource { this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath()); // This method is invoked when DataNode recovers, so the tierLevel should be calculated when // restarting - this.tierLevel = TierManager.getInstance().getFileTierLevel(file); + this.tierLevel.set(TierManager.getInstance().getFileTierLevel(file)); } /** Used for compaction to create target files. */ @@ -211,7 +214,7 @@ public class TsFileResource { this.isSeq = processor.isSequence(); // this method is invoked when a new TsFile is created and a newly created TsFile's the // tierLevel is 0 by default - this.tierLevel = 0; + this.tierLevel.set(0); } /** unsealed TsFile, for query */ @@ -391,6 +394,7 @@ public class TsFileResource { return pathToReadOnlyMemChunkMap.get(seriesPath); } + @SuppressWarnings("squid:S2886") public ModificationFile getModFile() { if (modFile == null) { synchronized (this) { @@ -435,11 +439,11 @@ public class TsFileResource { } public void increaseTierLevel() { - this.tierLevel++; + this.tierLevel.addAndGet(1); } public int getTierLevel() { - return tierLevel; + return tierLevel.get(); } public long getTsFileSize() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java index 86c7575f823..f2b0b57b773 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.NoSuchElementException; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TsFileResourceList implements List<TsFileResource> { @@ -378,6 +379,9 @@ public class TsFileResourceList implements List<TsFileResource> { @Override public TsFileResource next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } return this.tsFileResourceList.get(currentIndex++); } } @@ -398,6 +402,9 @@ public class TsFileResourceList implements List<TsFileResource> { @Override public TsFileResource next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } return tsFileResourceList.get(currentIndex--); } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java index 0088a8b2ec9..589be638e71 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java @@ -222,36 +222,6 @@ public class DataRegionTest { } } - @Test - public void testInsertDataAndRemovePartitionAndInsert() - throws WriteProcessException, QueryProcessException, IllegalPathException { - for (int j = 0; j < 10; j++) { - TSRecord record = new TSRecord(j, deviceId); - record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - dataRegion.insert(buildInsertRowNodeByTSRecord(record)); - dataRegion.asyncCloseAllWorkingTsFileProcessors(); - } - dataRegion.syncCloseAllWorkingTsFileProcessors(); - - dataRegion.removePartitions((storageGroupName, timePartitionId) -> true); - - for (int j = 0; j < 10; j++) { - TSRecord record = new TSRecord(j, deviceId); - record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); - dataRegion.insert(buildInsertRowNodeByTSRecord(record)); - dataRegion.asyncCloseAllWorkingTsFileProcessors(); - } - dataRegion.syncCloseAllWorkingTsFileProcessors(); - - QueryDataSource queryDataSource = - dataRegion.query( - Collections.singletonList(new PartialPath(deviceId, measurementId)), - deviceId, - context, - null); - Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); - } - @Test public void testIoTDBTabletWriteAndSyncClose() throws QueryProcessException, IllegalPathException, WriteProcessException {
