This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e9e01f3bdba Pipe: make PipeHistoricalDataRegionExtractor &
PipeRealtimeDataRegionExtractor log info much clear with pipe name and region
id added (#11920)
e9e01f3bdba is described below
commit e9e01f3bdba9c388eb73a94a24b4c3706f3ef133
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 17 18:41:45 2024 +0800
Pipe: make PipeHistoricalDataRegionExtractor &
PipeRealtimeDataRegionExtractor log info much clear with pipe name and region
id added (#11920)
---
.../pipe/extractor/IoTDBDataRegionExtractor.java | 23 ++++++---
.../PipeHistoricalDataRegionTsFileExtractor.java | 54 ++++++++++++++++++----
2 files changed, 62 insertions(+), 15 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index dfa8483ca34..58d274b82b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -220,14 +220,21 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
LOGGER.info(
- "'{}' is set to false, use fake realtime extractor.",
EXTRACTOR_REALTIME_ENABLE_KEY);
+ "Pipe {}@{}: '{}' is set to false, use fake realtime extractor.",
+ pipeName,
+ dataRegionId,
+ EXTRACTOR_REALTIME_ENABLE_KEY);
return;
}
// Use hybrid mode by default
if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY)) {
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
- LOGGER.info("'{}' is not set, use hybrid mode by default.",
EXTRACTOR_REALTIME_MODE_KEY);
+ LOGGER.info(
+ "Pipe {}@{}: '{}' is not set, use hybrid mode by default.",
+ pipeName,
+ dataRegionId,
+ EXTRACTOR_REALTIME_MODE_KEY);
return;
}
@@ -248,7 +255,9 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
- "Unsupported extractor realtime mode: {}, create a hybrid
extractor.",
+ "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a
hybrid extractor.",
+ pipeName,
+ dataRegionId,
parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY,
SOURCE_REALTIME_MODE_KEY));
}
}
@@ -321,9 +330,11 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
} catch (Exception e) {
exceptionHolder.set(e);
LOGGER.warn(
- String.format(
- "Start historical extractor %s and realtime extractor %s error.",
- historicalExtractor, realtimeExtractor),
+ "Pipe {}@{}: Start historical extractor {} and realtime extractor {}
error.",
+ pipeName,
+ dataRegionId,
+ historicalExtractor,
+ realtimeExtractor,
e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 917c4cd3308..a64d2a4ae7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -251,7 +251,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
.contains("time");
LOGGER.info(
- "historical data extraction time range, start time {}({}), end time
{}({}), sloppy time range {}",
+ "Pipe {}@{}: historical data extraction time range, start time {}({}),
end time {}({}), sloppy time range {}",
+ pipeName,
+ dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
historicalDataExtractionStartTime,
DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
@@ -286,20 +288,42 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
dataRegion.writeLock("Pipe: start to extract historical TsFile");
final long startHistoricalExtractionTime = System.currentTimeMillis();
try {
+ LOGGER.info("Pipe {}@{}: start to flush data region", pipeName,
dataRegionId);
synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime =
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId,
System.currentTimeMillis());
+ LOGGER.info(
+ "Pipe {}@{}: finish to flush data region, took {} ms",
+ pipeName,
+ dataRegionId,
+ System.currentTimeMillis() - startHistoricalExtractionTime);
+ } else {
+ LOGGER.info(
+ "Pipe {}@{}: skip to flush data region, last flushed time {} ms
ago",
+ pipeName,
+ dataRegionId,
+ System.currentTimeMillis() - lastFlushedByPipeTime);
}
}
final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
try {
+ final int originalSequenceTsFileCount = tsFileManager.size(true);
+ final int originalUnsequenceTsFileCount = tsFileManager.size(false);
final List<TsFileResource> resourceList =
- new ArrayList<>(tsFileManager.size(true) +
tsFileManager.size(false));
+ new ArrayList<>(originalSequenceTsFileCount +
originalUnsequenceTsFileCount);
+ LOGGER.info(
+ "Pipe {}@{}: start to extract historical TsFile, original sequence
file count {}, "
+ + "original unsequence file count {}, start progress index {}",
+ pipeName,
+ dataRegionId,
+ originalSequenceTsFileCount,
+ originalUnsequenceTsFileCount,
+ startIndex);
final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
@@ -343,11 +367,16 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
pendingQueue = new ArrayDeque<>(resourceList);
LOGGER.info(
- "Pipe: start to extract historical TsFile, data region {}, "
- + "sequence file count {}, unsequence file count {},
historical extraction time {} ms",
+ "Pipe {}@{}: finish to extract historical TsFile, extracted
sequence file count {}/{}, "
+ + "extracted unsequence file count {}/{}, extracted file count
{}/{}, took {} ms",
+ pipeName,
dataRegionId,
sequenceTsFileResources.size(),
+ originalSequenceTsFileCount,
unsequenceTsFileResources.size(),
+ originalUnsequenceTsFileCount,
+ resourceList.size(),
+ originalSequenceTsFileCount + originalUnsequenceTsFileCount,
System.currentTimeMillis() - startHistoricalExtractionTime);
} finally {
tsFileManager.readUnlock();
@@ -373,9 +402,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
<=
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
} catch (IOException e) {
LOGGER.warn(
- String.format(
- "failed to get the generation time of TsFile %s, extract it
anyway",
- resource.getTsFilePath()),
+ "Pipe {}@{}: failed to get the generation time of TsFile {}, extract
it anyway"
+ + " (historical data extraction time lower bound: {})",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath(),
+ historicalDataExtractionTimeLowerBound,
e);
// If failed to get the generation time of the TsFile, we will extract
the data in the TsFile
// anyway.
@@ -416,7 +448,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
PipeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (IOException e) {
LOGGER.warn(
- "Pipe: failed to unpin TsFileResource after creating event, original
path: {}",
+ "Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
+ pipeName,
+ dataRegionId,
resource.getTsFilePath());
}
@@ -441,7 +475,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
PipeResourceManager.tsfile().unpinTsFileResource(resource);
} catch (IOException e) {
LOGGER.warn(
- "Pipe: failed to unpin TsFileResource after dropping pipe,
original path: {}",
+ "Pipe {}@{}: failed to unpin TsFileResource after dropping
pipe, original path: {}",
+ pipeName,
+ dataRegionId,
resource.getTsFilePath());
}
});