This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cf4aed79c5d Pipe: Support `"source.history.loose-range" = "path"` in 
iotdb-source  (#12651)
cf4aed79c5d is described below

commit cf4aed79c5ddabba277ddcdf6275992408e7f817
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 12 21:20:51 2024 +0800

    Pipe: Support `"source.history.loose-range" = "path"` in iotdb-source  
(#12651)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   |  97 +++++++++++++++++++
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 104 +++++++++++++++------
 .../resource/tsfile/PipeTsFileResourceManager.java |  38 ++++----
 .../config/constant/PipeExtractorConstant.java     |   3 +
 4 files changed, 197 insertions(+), 45 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 1befb2b765d..f83699f30b7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -24,7 +24,9 @@ import 
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -32,6 +34,7 @@ import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -51,6 +54,39 @@ import static org.junit.Assert.fail;
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeExtractorIT extends AbstractPipeDualAutoIT {
+
+  @Before
+  public void setUp() {
+    MultiEnvFactory.createEnv(2);
+    senderEnv = MultiEnvFactory.getEnv(0);
+    receiverEnv = MultiEnvFactory.getEnv(1);
+
+    // TODO: delete ratis configurations
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        // Disable sender compaction for tsfile determination in loose range 
test
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false);
+    receiverEnv
+        .getConfig()
+        .getCommonConfig()
+        .setAutoCreateSchemaEnabled(true)
+        .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+        
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+    // 10 min, assert that the operations will not time out
+    senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+    receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
+
   @Test
   public void testExtractorValidParameter() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -867,6 +903,67 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  @Test
+  public void testLooseRange() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              // TsFile 1, extracted without parse
+              "insert into root.db.d1 (time, at1, at2)" + " values (1000, 1, 
2), (2000, 3, 4)",
+              // TsFile 2, not extracted because pattern not overlapped
+              "insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1, 
2), (2000, 3, 4)",
+              "flush"))) {
+        return;
+      }
+
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              // TsFile 3, not extracted because time range not overlapped
+              "insert into root.db.d1 (time, at1, at2)" + " values (3000, 1, 
2), (4000, 3, 4)",
+              "flush"))) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      extractorAttributes.put("source.path", "root.db.d1.at1");
+      extractorAttributes.put("source.inclusion", "data.insert");
+      extractorAttributes.put("source.history.start-time", "1500");
+      extractorAttributes.put("source.history.end-time", "2500");
+      extractorAttributes.put("source.history.loose-range", "time, path");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv,
+          "select count(*) from root.** group by level=0",
+          "count(root.*.*.*),",
+          Collections.singleton("4,"));
+    }
+  }
+
   private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) 
{
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton(count + ","));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dc86e82e361..b1e2649e6db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -45,6 +46,8 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,13 +62,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
@@ -100,6 +107,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
+  private boolean sloppyPattern;
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
 
   private Pair<Boolean, Boolean> listeningOptionPair;
@@ -123,6 +131,27 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       throw new PipeParameterNotValidException(e.getMessage());
     }
 
+    final Set<String> sloppyOptionSet =
+        Arrays.stream(
+                parameters
+                    .getStringOrDefault(
+                        Arrays.asList(
+                            EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, 
SOURCE_HISTORY_LOOSE_RANGE_KEY),
+                        EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
+                    .split(","))
+            .map(String::trim)
+            .map(String::toLowerCase)
+            .collect(Collectors.toSet());
+    // Avoid empty string
+    sloppyOptionSet.remove("");
+    sloppyTimeRange = 
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
+    sloppyPattern = 
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
+    if (!sloppyOptionSet.isEmpty()) {
+      throw new PipeParameterNotValidException(
+          String.format(
+              "Parameters in set %s are not allowed in 'history.loose-range'", 
sloppyOptionSet));
+    }
+
     if (parameters.hasAnyAttributes(
         SOURCE_START_TIME_KEY,
         EXTRACTOR_START_TIME_KEY,
@@ -280,19 +309,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       }
     }
 
-    sloppyTimeRange =
-        Arrays.stream(
-                parameters
-                    .getStringOrDefault(
-                        Arrays.asList(
-                            EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, 
SOURCE_HISTORY_LOOSE_RANGE_KEY),
-                        "")
-                    .split(","))
-            .map(String::trim)
-            .map(String::toLowerCase)
-            .collect(Collectors.toSet())
-            .contains("time");
-
     shouldTransferModFile =
         parameters.getBooleanOrDefault(
             Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
@@ -309,17 +325,20 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                 PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
             
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
 
-    LOGGER.info(
-        "Pipe {}@{}: historical data extraction time range, start time {}({}), 
end time {}({}), sloppy time range {}, should transfer mod file {}, should 
terminate pipe on all historical events consumed {}",
-        pipeName,
-        dataRegionId,
-        DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
-        historicalDataExtractionStartTime,
-        DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
-        historicalDataExtractionEndTime,
-        sloppyTimeRange,
-        shouldTransferModFile,
-        shouldTerminatePipeOnAllHistoricalEventsConsumed);
+    if (LOGGER.isInfoEnabled()) {
+      LOGGER.info(
+          "Pipe {}@{}: historical data extraction time range, start time 
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should 
transfer mod file {}, should terminate pipe on all historical events consumed 
{}",
+          pipeName,
+          dataRegionId,
+          DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
+          historicalDataExtractionStartTime,
+          DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
+          historicalDataExtractionEndTime,
+          sloppyPattern,
+          sloppyTimeRange,
+          shouldTransferModFile,
+          shouldTerminatePipeOnAllHistoricalEventsConsumed);
+    }
   }
 
   private void flushDataRegionAllTsFiles() {
@@ -399,7 +418,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                         !resource.isClosed()
                             || mayTsFileContainUnprocessedData(resource)
                                 && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+                                && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
+                                && 
mayTsFileResourceOverlappedWithPattern(resource))
                 .collect(Collectors.toList());
         resourceList.addAll(sequenceTsFileResources);
 
@@ -412,7 +432,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                         !resource.isClosed()
                             || mayTsFileContainUnprocessedData(resource)
                                 && 
isTsFileResourceOverlappedWithTimeRange(resource)
-                                && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+                                && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
+                                && 
mayTsFileResourceOverlappedWithPattern(resource))
                 .collect(Collectors.toList());
         resourceList.addAll(unsequenceTsFileResources);
 
@@ -474,6 +495,35 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
   }
 
+  private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource 
resource) {
+    if (!sloppyPattern) {
+      return true;
+    }
+
+    final Set<IDeviceID> deviceSet;
+    try {
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+          PipeResourceManager.tsfile()
+              .getDeviceIsAlignedMapFromCache(
+                  
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+      deviceSet =
+          Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : 
resource.getDevices();
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "Pipe {}@{}: failed to get devices from TsFile {}, extract it 
anyway",
+          pipeName,
+          dataRegionId,
+          resource.getTsFilePath(),
+          e);
+      return true;
+    }
+
+    return deviceSet.stream()
+        .anyMatch(
+            // TODO: use IDeviceID
+            deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
+  }
+
   private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource 
resource) {
     return !(resource.getFileEndTime() < historicalDataExtractionStartTime
         || historicalDataExtractionEndTime < resource.getFileStartTime());
@@ -530,7 +580,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             pipePattern,
             historicalDataExtractionStartTime,
             historicalDataExtractionEndTime);
-    if (isDbNameCoveredByPattern) {
+    if (sloppyPattern || isDbNameCoveredByPattern) {
       event.skipParsingPattern();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 0d064cd8b06..5e75e068fa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -71,7 +71,7 @@ public class PipeTsFileResourceManager {
       } else {
         LOGGER.warn("failed to try lock when checking TTL because of timeout 
({}s)", timeout);
       }
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.warn("failed to try lock when checking TTL because of 
interruption", e);
     }
@@ -102,7 +102,7 @@ public class PipeTsFileResourceManager {
                       entry.getKey(),
                       entry.getValue().getReferenceCount()));
         }
-      } catch (IOException e) {
+      } catch (final IOException e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
       }
     }
@@ -129,7 +129,8 @@ public class PipeTsFileResourceManager {
    * @return the hardlink or copied file
    * @throws IOException when create hardlink or copy file failed
    */
-  public File increaseFileReference(File file, boolean isTsFile, 
TsFileResource tsFileResource)
+  public File increaseFileReference(
+      final File file, final boolean isTsFile, final TsFileResource 
tsFileResource)
       throws IOException {
     lock.lock();
     try {
@@ -165,7 +166,7 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  private boolean increaseReferenceIfExists(String path) {
+  private boolean increaseReferenceIfExists(final String path) {
     final PipeTsFileResource resource = 
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
     if (resource != null) {
       resource.increaseAndGetReference();
@@ -174,10 +175,10 @@ public class PipeTsFileResourceManager {
     return false;
   }
 
-  private static File getHardlinkOrCopiedFileInPipeDir(File file) throws 
IOException {
+  public static File getHardlinkOrCopiedFileInPipeDir(final File file) throws 
IOException {
     try {
       return new File(getPipeTsFileDirPath(file), getRelativeFilePath(file));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new IOException(
           String.format(
               "failed to get hardlink or copied file in pipe dir "
@@ -218,7 +219,7 @@ public class PipeTsFileResourceManager {
    *
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    */
-  public void decreaseFileReference(File hardlinkOrCopiedFile) {
+  public void decreaseFileReference(final File hardlinkOrCopiedFile) {
     lock.lock();
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
@@ -237,7 +238,7 @@ public class PipeTsFileResourceManager {
    * @param hardlinkOrCopiedFile the copied or hardlinked file
    * @return the reference count of the file
    */
-  public int getFileReferenceCount(File hardlinkOrCopiedFile) {
+  public int getFileReferenceCount(final File hardlinkOrCopiedFile) {
     lock.lock();
     try {
       final String filePath = hardlinkOrCopiedFile.getPath();
@@ -254,7 +255,7 @@ public class PipeTsFileResourceManager {
    * @return {@code true} if the maps are successfully put into cache or 
already cached. {@code
    *     false} if they can not be cached.
    */
-  public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws 
IOException {
+  public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) 
throws IOException {
     lock.lock();
     try {
       final PipeTsFileResource resource =
@@ -265,8 +266,8 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(File 
hardlinkOrCopiedTsFile)
-      throws IOException {
+  public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(
+      final File hardlinkOrCopiedTsFile) throws IOException {
     lock.lock();
     try {
       final PipeTsFileResource resource =
@@ -277,7 +278,7 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(File 
hardlinkOrCopiedTsFile)
+  public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(final File 
hardlinkOrCopiedTsFile)
       throws IOException {
     lock.lock();
     try {
@@ -289,8 +290,8 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(File 
hardlinkOrCopiedTsFile)
-      throws IOException {
+  public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(
+      final File hardlinkOrCopiedTsFile) throws IOException {
     lock.lock();
     try {
       final PipeTsFileResource resource =
@@ -301,7 +302,8 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public void pinTsFileResource(TsFileResource resource, boolean withMods) 
throws IOException {
+  public void pinTsFileResource(final TsFileResource resource, final boolean 
withMods)
+      throws IOException {
     lock.lock();
     try {
       increaseFileReference(resource.getTsFile(), true, resource);
@@ -313,13 +315,13 @@ public class PipeTsFileResourceManager {
     }
   }
 
-  public void unpinTsFileResource(TsFileResource resource) throws IOException {
+  public void unpinTsFileResource(final TsFileResource resource) throws 
IOException {
     lock.lock();
     try {
-      File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
+      final File pinnedFile = 
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
       decreaseFileReference(pinnedFile);
 
-      File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
+      final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
       if (modFile.exists()) {
         decreaseFileReference(modFile);
       }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index a67b594d361..99efcd48d61 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -64,6 +64,9 @@ public class PipeExtractorConstant {
   public static final String SOURCE_HISTORY_END_TIME_KEY = 
"source.history.end-time";
   public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY = 
"extractor.history.loose-range";
   public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY = 
"source.history.loose-range";
+  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE = "time";
+  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE = "path";
+  public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE = "";
   public static final String EXTRACTOR_MODS_ENABLE_KEY = 
"extractor.mods.enable";
   public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
   public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;

Reply via email to