This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-progress-index
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 10a2028ccd1458ba9b551d97da636acf3bbaf611
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jan 17 16:54:28 2024 +0800

    Pipe: make log info much clear with pipe name and region id added
---
 .../pipe/extractor/IoTDBDataRegionExtractor.java   | 23 +++++++---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 53 ++++++++++++++++++----
 2 files changed, 61 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index dfa8483ca34..58d274b82b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -220,14 +220,21 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) {
       realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
       LOGGER.info(
-          "'{}' is set to false, use fake realtime extractor.", 
EXTRACTOR_REALTIME_ENABLE_KEY);
+          "Pipe {}@{}: '{}' is set to false, use fake realtime extractor.",
+          pipeName,
+          dataRegionId,
+          EXTRACTOR_REALTIME_ENABLE_KEY);
       return;
     }
 
     // Use hybrid mode by default
     if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY)) {
       realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
-      LOGGER.info("'{}' is not set, use hybrid mode by default.", 
EXTRACTOR_REALTIME_MODE_KEY);
+      LOGGER.info(
+          "Pipe {}@{}: '{}' is not set, use hybrid mode by default.",
+          pipeName,
+          dataRegionId,
+          EXTRACTOR_REALTIME_MODE_KEY);
       return;
     }
 
@@ -248,7 +255,9 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
         realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
         if (LOGGER.isWarnEnabled()) {
           LOGGER.warn(
-              "Unsupported extractor realtime mode: {}, create a hybrid 
extractor.",
+              "Pipe {}@{}: Unsupported extractor realtime mode: {}, create a 
hybrid extractor.",
+              pipeName,
+              dataRegionId,
               parameters.getStringByKeys(EXTRACTOR_REALTIME_MODE_KEY, 
SOURCE_REALTIME_MODE_KEY));
         }
     }
@@ -321,9 +330,11 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     } catch (Exception e) {
       exceptionHolder.set(e);
       LOGGER.warn(
-          String.format(
-              "Start historical extractor %s and realtime extractor %s error.",
-              historicalExtractor, realtimeExtractor),
+          "Pipe {}@{}: Start historical extractor {} and realtime extractor {} 
error.",
+          pipeName,
+          dataRegionId,
+          historicalExtractor,
+          realtimeExtractor,
           e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 917c4cd3308..a57de8113c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -251,7 +251,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             .contains("time");
 
     LOGGER.info(
-        "historical data extraction time range, start time {}({}), end time 
{}({}), sloppy time range {}",
+        "Pipe {}@{}: historical data extraction time range, start time {}({}), 
end time {}({}), sloppy time range {}",
+        pipeName,
+        dataRegionId,
         DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
         historicalDataExtractionStartTime,
         DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
@@ -286,20 +288,42 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     dataRegion.writeLock("Pipe: start to extract historical TsFile");
     final long startHistoricalExtractionTime = System.currentTimeMillis();
     try {
+      LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, 
dataRegionId);
       synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
         final long lastFlushedByPipeTime =
             DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
         if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
           dataRegion.syncCloseAllWorkingTsFileProcessors();
           DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
+          LOGGER.info(
+              "Pipe {}@{}: finish to flush data region, took {} ms",
+              pipeName,
+              dataRegionId,
+              System.currentTimeMillis() - startHistoricalExtractionTime);
+        } else {
+          LOGGER.info(
+              "Pipe {}@{}: skip to flush data region, last flushed time {} ms 
ago",
+              pipeName,
+              dataRegionId,
+              System.currentTimeMillis() - lastFlushedByPipeTime);
         }
       }
 
       final TsFileManager tsFileManager = dataRegion.getTsFileManager();
       tsFileManager.readLock();
       try {
+        final int originalSequenceTsFileCount = tsFileManager.size(true);
+        final int originalUnsequenceTsFileCount = tsFileManager.size(false);
         final List<TsFileResource> resourceList =
-            new ArrayList<>(tsFileManager.size(true) + 
tsFileManager.size(false));
+            new ArrayList<>(originalSequenceTsFileCount + 
originalUnsequenceTsFileCount);
+        LOGGER.info(
+            "Pipe {}@{}: start to extract historical TsFile, original sequence 
file count {}, "
+                + "original unsequence file count {}, start progress index {}",
+            pipeName,
+            dataRegionId,
+            originalSequenceTsFileCount,
+            originalUnsequenceTsFileCount,
+            startIndex);
 
         final Collection<TsFileResource> sequenceTsFileResources =
             tsFileManager.getTsFileList(true).stream()
@@ -343,11 +367,16 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
         pendingQueue = new ArrayDeque<>(resourceList);
 
         LOGGER.info(
-            "Pipe: start to extract historical TsFile, data region {}, "
-                + "sequence file count {}, unsequence file count {}, 
historical extraction time {} ms",
+            "Pipe {}@{}: finish to extract historical TsFile, extracted 
sequence file count {}/{}, "
+                + "extracted unsequence file count {}/{}, extracted file count 
{}/{}, took {} ms",
+            pipeName,
             dataRegionId,
             sequenceTsFileResources.size(),
+            originalSequenceTsFileCount,
             unsequenceTsFileResources.size(),
+            originalUnsequenceTsFileCount,
+            resourceList.size(),
+            originalSequenceTsFileCount + originalUnsequenceTsFileCount,
             System.currentTimeMillis() - startHistoricalExtractionTime);
       } finally {
         tsFileManager.readUnlock();
@@ -373,9 +402,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
     } catch (IOException e) {
       LOGGER.warn(
-          String.format(
-              "failed to get the generation time of TsFile %s, extract it 
anyway",
-              resource.getTsFilePath()),
+          "Pipe {}@{}: failed to get the generation time of TsFile {}, extract 
it anyway"
+              + " (historical data extraction time lower bound: {})",
+          pipeName,
+          dataRegionId,
+          resource.getTsFilePath(),
           e);
       // If failed to get the generation time of the TsFile, we will extract 
the data in the TsFile
       // anyway.
@@ -416,7 +447,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       PipeResourceManager.tsfile().unpinTsFileResource(resource);
     } catch (IOException e) {
       LOGGER.warn(
-          "Pipe: failed to unpin TsFileResource after creating event, original 
path: {}",
+          "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
+          pipeName,
+          dataRegionId,
           resource.getTsFilePath());
     }
 
@@ -441,7 +474,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
               PipeResourceManager.tsfile().unpinTsFileResource(resource);
             } catch (IOException e) {
               LOGGER.warn(
-                  "Pipe: failed to unpin TsFileResource after dropping pipe, 
original path: {}",
+                  "Pipe {}@{}: failed to unpin TsFileResource after dropping 
pipe, original path: {}",
+                  pipeName,
+                  dataRegionId,
                   resource.getTsFilePath());
             }
           });

Reply via email to