This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-progress-index in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10a2028ccd1458ba9b551d97da636acf3bbaf611 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jan 17 16:54:28 2024 +0800 Pipe: make log info much clear with pipe name and region id added --- .../pipe/extractor/IoTDBDataRegionExtractor.java | 23 +++++++--- .../PipeHistoricalDataRegionTsFileExtractor.java | 53 ++++++++++++++++++---- 2 files changed, 61 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java index dfa8483ca34..58d274b82b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java @@ -220,14 +220,21 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor(); LOGGER.info( - "'{}' is set to false, use fake realtime extractor.", EXTRACTOR_REALTIME_ENABLE_KEY); + "Pipe {}@{}: '{}' is set to false, use fake realtime extractor.", + pipeName, + dataRegionId, + EXTRACTOR_REALTIME_ENABLE_KEY); return; } // Use hybrid mode by default if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); - LOGGER.info("'{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); + LOGGER.info( + "Pipe {}@{}: '{}' is not set, use hybrid mode by default.", + pipeName, + dataRegionId, + EXTRACTOR_REALTIME_MODE_KEY); return; } @@ -248,7 +255,9 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); if (LOGGER.isWarnEnabled()) { LOGGER.warn( - "Unsupported extractor realtime mode: {}, create a hybrid extractor.", + "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a hybrid extractor.", + pipeName, + dataRegionId, parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)); } } @@ -321,9 +330,11 @@ public class IoTDBDataRegionExtractor implements PipeExtractor { } catch (Exception e) { exceptionHolder.set(e); LOGGER.warn( - String.format( - "Start historical extractor %s and realtime extractor %s error.", - historicalExtractor, realtimeExtractor), + "Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.", + pipeName, + dataRegionId, + historicalExtractor, + realtimeExtractor, e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 917c4cd3308..a57de8113c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -251,7 +251,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa .contains("time"); LOGGER.info( - "historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}", + "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}", + pipeName, + dataRegionId, DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime), historicalDataExtractionStartTime, DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime), @@ -286,20 +288,42 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa dataRegion.writeLock("Pipe: start to extract historical TsFile"); final long startHistoricalExtractionTime = System.currentTimeMillis(); try { + LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, dataRegionId); synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { dataRegion.syncCloseAllWorkingTsFileProcessors(); DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + LOGGER.info( + "Pipe {}@{}: finish to flush data region, took {} ms", + pipeName, + dataRegionId, + System.currentTimeMillis() - startHistoricalExtractionTime); + } else { + LOGGER.info( + "Pipe {}@{}: skip to flush data region, last flushed time {} ms ago", + pipeName, + dataRegionId, + System.currentTimeMillis() - lastFlushedByPipeTime); } } final TsFileManager tsFileManager = dataRegion.getTsFileManager(); tsFileManager.readLock(); try { + final int originalSequenceTsFileCount = tsFileManager.size(true); + final int originalUnsequenceTsFileCount = tsFileManager.size(false); final List<TsFileResource> resourceList = - new ArrayList<>(tsFileManager.size(true) + tsFileManager.size(false)); + new ArrayList<>(originalSequenceTsFileCount + originalUnsequenceTsFileCount); + LOGGER.info( + "Pipe {}@{}: start to extract historical TsFile, original sequence file count {}, " + + "original unsequence file count {}, start progress index {}", + pipeName, + dataRegionId, + originalSequenceTsFileCount, + originalUnsequenceTsFileCount, + startIndex); final Collection<TsFileResource> sequenceTsFileResources = tsFileManager.getTsFileList(true).stream() @@ -343,11 +367,16 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa pendingQueue = new ArrayDeque<>(resourceList); LOGGER.info( - "Pipe: start to extract historical TsFile, data region {}, " - + "sequence file count {}, unsequence file count {}, historical extraction time {} ms", + "Pipe {}@{}: finish to extract historical TsFile, extracted sequence file count {}/{}, " + + "extracted unsequence file count {}/{}, extracted file count {}/{}, took {} ms", + pipeName, dataRegionId, sequenceTsFileResources.size(), + originalSequenceTsFileCount, unsequenceTsFileResources.size(), + originalUnsequenceTsFileCount, + resourceList.size(), + originalSequenceTsFileCount + originalUnsequenceTsFileCount, System.currentTimeMillis() - startHistoricalExtractionTime); } finally { tsFileManager.readUnlock(); @@ -373,9 +402,11 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa <= TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime(); } catch (IOException e) { LOGGER.warn( - String.format( - "failed to get the generation time of TsFile %s, extract it anyway", - resource.getTsFilePath()), + "Pipe {}@{}: failed to get the generation time of TsFile {}, extract it anyway" + + " (historical data extraction time lower bound: {})", + pipeName, + dataRegionId, + resource.getTsFilePath(), e); // If failed to get the generation time of the TsFile, we will extract the data in the TsFile // anyway. @@ -416,7 +447,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa PipeResourceManager.tsfile().unpinTsFileResource(resource); } catch (IOException e) { LOGGER.warn( - "Pipe: failed to unpin TsFileResource after creating event, original path: {}", + "Pipe {}@{}: failed to unpin TsFileResource after creating event, original path: {}", + pipeName, + dataRegionId, resource.getTsFilePath()); } @@ -441,7 +474,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa PipeResourceManager.tsfile().unpinTsFileResource(resource); } catch (IOException e) { LOGGER.warn( - "Pipe: failed to unpin TsFileResource after dropping pipe, original path: {}", + "Pipe {}@{}: failed to unpin TsFileResource after dropping pipe, original path: {}", + pipeName, + dataRegionId, resource.getTsFilePath()); } });
