This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch compation-log in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ea15c88f290ec80d063c44fcf303ac3971f59c14 Author: shuwenwei <[email protected]> AuthorDate: Fri Jun 20 11:34:33 2025 +0800 modify TsFileProcessor --- .../dataregion/flush/MemTableFlushTask.java | 3 +- .../dataregion/memtable/TsFileProcessor.java | 141 +++++++++++---------- 2 files changed, 74 insertions(+), 70 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index d1a3f3c5bab..8d586dfc619 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.flush; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; @@ -60,7 +61,7 @@ import java.util.concurrent.TimeUnit; */ public class MemTableFlushTask { - private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.WRITE_LOGGER_NAME); private static final FlushSubTaskPoolManager SUB_TASK_POOL_MANAGER = FlushSubTaskPoolManager.getInstance(); private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index bd4c11e969b..9cc35f11d3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedPath; @@ -120,7 +121,9 @@ import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK public class TsFileProcessor { /** Logger fot this class. */ - private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class); + private static final Logger writeLogger = LoggerFactory.getLogger(IoTDBConstant.WRITE_LOGGER_NAME); + private static final Logger queryLogger = LoggerFactory.getLogger(IoTDBConstant.QUERY_LOGGER_NAME); + private static final Logger otherLogger = LoggerFactory.getLogger(TsFileProcessor.class); private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); @@ -226,7 +229,7 @@ public class TsFileProcessor { flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE); flushListeners.add(this.walNode); closeFileListeners.add(closeUnsealedTsFileProcessor); - logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); + writeLogger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); } @SuppressWarnings("java:S107") // ignore number of arguments @@ -250,7 +253,7 @@ public class TsFileProcessor { flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE); flushListeners.add(this.walNode); closeFileListeners.add(closeUnsealedTsFileProcessor); - logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile()); + writeLogger.info("reopen a tsfile processor {}", tsFileResource.getTsFile()); } /** @@ -296,7 +299,7 @@ public class TsFileProcessor { } } catch (Exception e) { rollbackMemoryInfo(memIncrements); - logger.warn("Exception during wal flush", e); + writeLogger.warn("Exception during wal flush", e); throw new WriteProcessException( String.format( "%s: %s write WAL failed: %s", @@ -392,7 +395,7 @@ public class TsFileProcessor { } } catch (Exception e) { rollbackMemoryInfo(memIncrements); - logger.warn("Exception during wal flush", e); + writeLogger.warn("Exception during wal flush", e); throw new WriteProcessException( String.format( "%s: %s write WAL failed: %s", @@ -1028,7 +1031,7 @@ public class TsFileProcessor { logFlushQueryWriteLocked(); try { if (workMemTable != null) { - logger.info( + writeLogger.info( "[Deletion] Deletion with path: {}, time:{}-{} in workMemTable", deletion.getPath(), deletion.getStartTime(), @@ -1069,7 +1072,7 @@ public class TsFileProcessor { @TestOnly public void syncClose() throws ExecutionException { - logger.info( + writeLogger.info( "Sync close file: {}, will firstly async close it", tsFileResource.getTsFile().getAbsolutePath()); if (shouldClose) { @@ -1077,7 +1080,7 @@ public class TsFileProcessor { } try { asyncClose().get(); - logger.info("Start to wait until file {} is closed", tsFileResource); + writeLogger.info("Start to wait until file {} is closed", tsFileResource); // if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly, // but the TsFileProcessor may be not closed. Therefore, we need to check whether the writer // is null. @@ -1087,7 +1090,7 @@ public class TsFileProcessor { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath()); + writeLogger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath()); } /** async close one tsfile, register and close it by another thread */ @@ -1095,9 +1098,9 @@ public class TsFileProcessor { flushQueryLock.writeLock().lock(); logFlushQueryWriteLocked(); try { - if (logger.isDebugEnabled()) { + if (writeLogger.isDebugEnabled()) { if (workMemTable != null) { - logger.debug( + writeLogger.debug( "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile " + "size: {}, plan index: [{}, {}], progress index: {}", storageGroupName, @@ -1108,7 +1111,7 @@ public class TsFileProcessor { workMemTable.getMaxPlanIndex(), tsFileResource.getMaxProgressIndex()); } else { - logger.debug( + writeLogger.debug( "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1145,7 +1148,7 @@ public class TsFileProcessor { shouldClose = true; return future; } catch (Exception e) { - logger.error( + writeLogger.error( "{}: {} async close failed, because", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1169,8 +1172,8 @@ public class TsFileProcessor { logFlushQueryWriteLocked(); try { tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; - if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) { - logger.debug( + if (writeLogger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) { + writeLogger.debug( "{}: {} add a signal memtable into flushing memtable list when sync flush", storageGroupName, tsFileResource.getTsFile().getName()); @@ -1188,14 +1191,14 @@ public class TsFileProcessor { flushingMemTables.wait(1000); if ((System.currentTimeMillis() - startWait) > 60_000) { - logger.warn( + writeLogger.warn( "has waited for synced flushing a memtable in {} for 60 seconds.", this.tsFileResource.getTsFile().getAbsolutePath()); startWait = System.currentTimeMillis(); } } } catch (InterruptedException e) { - logger.error( + writeLogger.error( "{}: {} wait flush finished meets error", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1213,11 +1216,11 @@ public class TsFileProcessor { if (workMemTable == null) { return; } - logger.info( + writeLogger.info( "Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath()); addAMemtableIntoFlushingList(workMemTable); } catch (Exception e) { - logger.error( + writeLogger.error( "{}: {} add a memtable into flushing list failed", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1257,8 +1260,8 @@ public class TsFileProcessor { SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); flushingMemTables.addLast(tobeFlushed); - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} Memtable (signal = {}) is added into the flushing Memtable, queue size = {}", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1284,13 +1287,13 @@ public class TsFileProcessor { try { writer.makeMetadataVisible(); if (!flushingMemTables.remove(memTable)) { - logger.warn( + writeLogger.warn( "{}: {} put the memtable (signal={}) out of flushingMemtables but it is not in the queue.", storageGroupName, tsFileResource.getTsFile().getName(), memTable.isSignalMemTable()); - } else if (logger.isDebugEnabled()) { - logger.debug( + } else if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} memtable (signal={}) is removed from the queue. {} left.", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1301,8 +1304,8 @@ public class TsFileProcessor { MemTableManager.getInstance().decreaseMemtableNumber(); // Reset the mem cost in StorageGroupProcessorInfo dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost()); - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "[mem control] {}: {} flush finished, try to reset system mem cost, " + "flushing memtable list size: {}", storageGroupName, @@ -1312,8 +1315,8 @@ public class TsFileProcessor { // Report to System SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo); SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} flush finished, remove a memtable from flushing list, " + "flushing memtable list size: {}", storageGroupName, @@ -1321,7 +1324,7 @@ public class TsFileProcessor { flushingMemTables.size()); } } catch (Exception e) { - logger.error("{}: {}", storageGroupName, tsFileResource.getTsFile().getName(), e); + writeLogger.error("{}: {}", storageGroupName, tsFileResource.getTsFile().getName(), e); } finally { flushQueryLock.writeLock().unlock(); logFlushQueryWriteUnlocked(); @@ -1333,8 +1336,8 @@ public class TsFileProcessor { synchronized (flushingMemTables) { releaseFlushedMemTable(memTable); flushingMemTables.notifyAll(); - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} released a memtable (signal={}), flushingMemtables size ={}", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1355,7 +1358,7 @@ public class TsFileProcessor { // Signal memtable only may appear when calling asyncClose() if (!memTableToFlush.isSignalMemTable()) { if (memTableToFlush.isEmpty()) { - logger.info( + writeLogger.info( "This normal memtable is empty, skip flush. {}: {}", storageGroupName, tsFileResource.getTsFile().getName()); @@ -1372,7 +1375,7 @@ public class TsFileProcessor { memTableFlushPointCount = memTableToFlush.getTotalPointsNum(); } catch (Throwable e) { if (writer == null) { - logger.info( + writeLogger.info( "{}: {} is closed during flush, abandon flush task", storageGroupName, tsFileResource.getTsFile().getAbsolutePath()); @@ -1380,21 +1383,21 @@ public class TsFileProcessor { flushingMemTables.notifyAll(); } } else { - logger.error( + writeLogger.error( "{}: {} meet error when flushing a memtable, change system mode to error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), e); CommonDescriptor.getInstance().getConfig().handleUnrecoverableError(); try { - logger.error( + writeLogger.error( "{}: {} IOTask meets error, truncate the corrupted data", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), e); writer.reset(); } catch (IOException e1) { - logger.error( + writeLogger.error( "{}: {} Truncate corrupted data meets error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1416,7 +1419,7 @@ public class TsFileProcessor { flushingMemTables.notifyAll(); } } catch (Exception e1) { - logger.error( + writeLogger.error( "{}: {} Release resource meets error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1438,7 +1441,7 @@ public class TsFileProcessor { this.tsFileResource.getModFile().write(entry.left); tsFileResource.getModFile().close(); iterator.remove(); - logger.info( + otherLogger.info( "[Deletion] Deletion with path: {}, time:{}-{} written when flush memtable", entry.left.getPath(), ((Deletion) (entry.left)).getStartTime(), @@ -1446,7 +1449,7 @@ public class TsFileProcessor { } } } catch (IOException e) { - logger.error( + otherLogger.error( "Meet error when writing into ModificationFile file of {} ", tsFileResource.getTsFile().getAbsolutePath(), e); @@ -1454,8 +1457,8 @@ public class TsFileProcessor { flushQueryLock.writeLock().unlock(); } - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} try get lock to release a memtable (signal={})", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1467,7 +1470,7 @@ public class TsFileProcessor { try { writer.getTsFileOutput().force(); } catch (IOException e) { - logger.error("fsync memTable data to disk error,", e); + writeLogger.error("fsync memTable data to disk error,", e); } // Call flushed listener after memtable is released safely @@ -1483,19 +1486,19 @@ public class TsFileProcessor { } else { writer.mark(); updateCompressionRatio(); - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} flushingMemtables is empty and will close the file", storageGroupName, tsFileResource.getTsFile().getAbsolutePath()); } endFile(); } - if (logger.isDebugEnabled()) { - logger.debug("{} flushingMemtables is clear", storageGroupName); + if (writeLogger.isDebugEnabled()) { + writeLogger.debug("{} flushingMemtables is clear", storageGroupName); } } catch (Exception e) { - logger.error( + writeLogger.error( "{}: {} marking or ending file meet error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1504,7 +1507,7 @@ public class TsFileProcessor { try { writer.reset(); } catch (IOException e1) { - logger.error( + writeLogger.error( "{}: {} truncate corrupted data meets error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1512,7 +1515,7 @@ public class TsFileProcessor { } // Retry or set read-only if (retryCnt < 3) { - logger.warn( + writeLogger.warn( "{} meet error when flush FileMetadata to {}, retry it again", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1520,7 +1523,7 @@ public class TsFileProcessor { retryCnt++; continue; } else { - logger.error( + writeLogger.error( "{} meet error when flush FileMetadata to {}, change system mode to error", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), @@ -1530,8 +1533,8 @@ public class TsFileProcessor { } } // For sync close - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} try to get flushingMemtables lock.", storageGroupName, tsFileResource.getTsFile().getAbsolutePath()); @@ -1545,7 +1548,7 @@ public class TsFileProcessor { private void updateCompressionRatio() { try { double compressionRatio = ((double) totalMemTableSize) / writer.getPos(); - logger.info( + writeLogger.info( "The compression ratio of tsfile {} is {}, totalMemTableSize: {}, the file size: {}", writer.getFile().getAbsolutePath(), String.format("%.2f", compressionRatio), @@ -1556,7 +1559,7 @@ public class TsFileProcessor { .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio); CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos()); } catch (IOException e) { - logger.error( + writeLogger.error( "{}: {} update compression ratio failed", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1566,14 +1569,14 @@ public class TsFileProcessor { /** end file and write some meta */ private void endFile() throws IOException, TsFileProcessorException { - if (logger.isDebugEnabled()) { - logger.debug("Start to end file {}", tsFileResource); + if (writeLogger.isDebugEnabled()) { + writeLogger.debug("Start to end file {}", tsFileResource); } writer.endFile(); tsFileResource.serialize(); FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource); - if (logger.isDebugEnabled()) { - logger.debug("Ended file {}", tsFileResource); + if (writeLogger.isDebugEnabled()) { + writeLogger.debug("Ended file {}", tsFileResource); } // Remove this processor from Closing list in StorageGroupProcessor, // Mark the TsFileResource closed, no need writer anymore @@ -1589,7 +1592,7 @@ public class TsFileProcessor { /** End empty file and remove it from file system */ private void endEmptyFile() throws TsFileProcessorException, IOException { - logger.info("Start to end empty file {}", tsFileResource); + writeLogger.info("Start to end empty file {}", tsFileResource); // Remove this processor from Closing list in DataRegion, // Mark the TsFileResource closed, no need writer anymore @@ -1599,7 +1602,7 @@ public class TsFileProcessor { } tsFileProcessorInfo.clear(); dataRegionInfo.closeTsFileProcessorAndReportToSystem(this); - logger.info( + writeLogger.info( "Storage group {} close and remove empty file {}", storageGroupName, tsFileResource.getTsFile().getAbsoluteFile()); @@ -1856,7 +1859,7 @@ public class TsFileProcessor { } } } catch (QueryProcessException | MetadataException | IOException e) { - logger.error( + queryLogger.error( "{}: {} get ReadOnlyMemChunk has error", storageGroupName, tsFileResource.getTsFile().getName(), @@ -1940,7 +1943,7 @@ public class TsFileProcessor { } } } catch (QueryProcessException | MetadataException | IOException e) { - logger.error( + queryLogger.error( "{}: {} get ReadOnlyMemChunk has error", storageGroupName, tsFileResource.getTsFile().getName(), @@ -2019,7 +2022,7 @@ public class TsFileProcessor { } } } catch (QueryProcessException | MetadataException e) { - logger.error( + queryLogger.error( "{}: {} get ReadOnlyMemChunk has error", storageGroupName, tsFileResource.getTsFile().getName(), @@ -2141,22 +2144,22 @@ public class TsFileProcessor { } private void logFlushQueryWriteLocked() { - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); } } private void logFlushQueryWriteUnlocked() { - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); } } private void logFlushQueryReadUnlocked() { - if (logger.isDebugEnabled()) { - logger.debug( + if (writeLogger.isDebugEnabled()) { + writeLogger.debug( "{}: {} release flushQueryLock", storageGroupName, tsFileResource.getTsFile().getName()); } }
