This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage_sync_code in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 340148a76844a596b616f553cd6b611a878a3a1a Merge: 9b4ae42f903 5467cfca81c Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon May 29 16:49:07 2023 +0800 Merge branch 'master' into tiered_storage .../iotdb/confignode/service/ConfigNode.java | 2 + .../request/BatchIndexedConsensusRequest.java | 6 +- .../consensus/iot/IoTConsensusServerImpl.java | 8 +- .../consensus/iot/logdispatcher/LogDispatcher.java | 4 +- docker/src/main/Dockerfile-1c1d | 3 + docs/UserGuide/Monitor-Alert/Metric-Tool.md | 3 +- docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md | 13 +- .../src/main/thrift/iotconsensus.thrift | 2 +- .../apache/iotdb/metrics/config/MetricConfig.java | 7 + .../iotdb/metrics/metricsets/UpTimeMetrics.java | 44 ++++++ .../commons/consensus/index/ProgressIndexType.java | 6 + .../consensus/index/impl/SimpleProgressIndex.java | 160 +++++++++++++++++++++ .../IoTConsensusDataRegionStateMachine.java | 2 +- .../iotdb/db/engine/storagegroup/DataRegion.java | 37 +++-- .../db/engine/storagegroup/TsFileProcessor.java | 5 +- .../db/engine/storagegroup/TsFileResource.java | 7 + .../{PipeLauncher.java => PipeAgentLauncher.java} | 6 +- .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 33 +++-- .../SimpleConsensusProgressIndexAssigner.java | 112 +++++++++++++++ .../apache/iotdb/db/pipe/config/PipeConfig.java | 6 + .../core/collector/IoTDBDataRegionCollector.java | 5 +- .../PipeRealtimeDataRegionHybridCollector.java | 8 +- .../listener/PipeInsertionDataNodeListener.java | 7 + .../manager/PipeConnectorSubtaskLifeCycle.java | 8 +- .../manager/PipeConnectorSubtaskManager.java | 15 +- .../event/view/collector/PipeEventCollector.java | 6 +- .../executor/PipeSubtaskExecutorManager.java | 4 +- .../file/PipeHardlinkFileDirStartupCleaner.java | 59 ++++++++ .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 3 +- .../task/queue/ListenableBlockingPendingQueue.java | 153 +++++++++++++++++++- ... => ListenableBoundedBlockingPendingQueue.java} | 5 +- .../db/pipe/task/queue/ListenablePendingQueue.java | 159 -------------------- ...> ListenableUnboundedBlockingPendingQueue.java} | 9 +- .../db/pipe/task/stage/PipeTaskCollectorStage.java | 8 +- .../db/pipe/task/stage/PipeTaskConnectorStage.java | 12 +- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 14 +- .../db/pipe/task/subtask/PipeConnectorSubtask.java | 22 +-- .../db/pipe/task/subtask/PipeProcessorSubtask.java | 4 +- .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 18 ++- .../java/org/apache/iotdb/db/service/DataNode.java | 2 +- .../db/service/metrics/DataNodeMetricsHelper.java | 2 + .../compaction/CompactionTaskManagerTest.java | 4 +- .../SizeTieredCompactionSelectorTest.java | 2 + .../core/collector/PipeRealtimeCollectTest.java | 10 +- .../executor/PipeConnectorSubtaskExecutorTest.java | 6 +- .../executor/PipeProcessorSubtaskExecutorTest.java | 2 + 46 files changed, 727 insertions(+), 286 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 69340daa0b4,e3ee6e73df9..7f3dfb11462 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@@ -1677,26 -1605,25 +1678,24 @@@ public class DataRegion implements IDat if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) { return; } - + // Try to set the resource to DELETED status and return if it failed + if (!resource.setStatus(TsFileResourceStatus.DELETED)) { + return; + } + tsFileManager.remove(resource, isSeq); // ensure that the file is not used by any queries - if (resource.tryWriteLock()) { - try { - // Try to set the resource to DELETED status and return if it failed - if (!resource.setStatus(TsFileResourceStatus.DELETED)) { - return; - } - // try to delete physical data file - resource.remove(); - tsFileManager.remove(resource, isSeq); - logger.info( - "Removed a file {} before {} by ttl ({} {})", - resource.getTsFilePath(), - new Date(ttlLowerBound), - dataTTL, - config.getTimestampPrecision()); - } finally { - resource.writeUnlock(); - } - + resource.writeLock(); + try { + // try to delete physical data file + resource.remove(); + logger.info( + "Removed a file {} before {} by ttl ({} {})", + resource.getTsFilePath(), + new Date(ttlLowerBound), + dataTTL, + config.getTimestampPrecision()); + } finally { + resource.writeUnlock(); } }
