This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cf4aed79c5d Pipe: Support `"source.history.loose-range" = "path"` in
iotdb-source (#12651)
cf4aed79c5d is described below
commit cf4aed79c5ddabba277ddcdf6275992408e7f817
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 12 21:20:51 2024 +0800
Pipe: Support `"source.history.loose-range" = "path"` in iotdb-source
(#12651)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 97 +++++++++++++++++++
.../PipeHistoricalDataRegionTsFileExtractor.java | 104 +++++++++++++++------
.../resource/tsfile/PipeTsFileResourceManager.java | 38 ++++----
.../config/constant/PipeExtractorConstant.java | 3 +
4 files changed, 197 insertions(+), 45 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 1befb2b765d..f83699f30b7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -24,7 +24,9 @@ import
org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -32,6 +34,7 @@ import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -51,6 +54,39 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeExtractorIT extends AbstractPipeDualAutoIT {
+
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+
+ // TODO: delete ratis configurations
+ senderEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ // Disable sender compaction for tsfile determination in loose range
test
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setAutoCreateSchemaEnabled(true)
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
+
+ // 10 min, assert that the operations will not time out
+ senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+ receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
+
+ senderEnv.initClusterEnvironment();
+ receiverEnv.initClusterEnvironment();
+ }
+
@Test
public void testExtractorValidParameter() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
@@ -867,6 +903,67 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
}
}
+ @Test
+ public void testLooseRange() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ // TsFile 1, extracted without parse
+ "insert into root.db.d1 (time, at1, at2)" + " values (1000, 1,
2), (2000, 3, 4)",
+ // TsFile 2, not extracted because pattern not overlapped
+ "insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1,
2), (2000, 3, 4)",
+ "flush"))) {
+ return;
+ }
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ // TsFile 3, not extracted because time range not overlapped
+ "insert into root.db.d1 (time, at1, at2)" + " values (3000, 1,
2), (4000, 3, 4)",
+ "flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source.path", "root.db.d1.at1");
+ extractorAttributes.put("source.inclusion", "data.insert");
+ extractorAttributes.put("source.history.start-time", "1500");
+ extractorAttributes.put("source.history.end-time", "2500");
+ extractorAttributes.put("source.history.loose-range", "time, path");
+
+ connectorAttributes.put("connector", "iotdb-thrift-connector");
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.** group by level=0",
+ "count(root.*.*.*),",
+ Collections.singleton("4,"));
+ }
+ }
+
private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count)
{
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton(count + ","));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dc86e82e361..b1e2649e6db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -45,6 +46,8 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,13 +62,17 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
@@ -100,6 +107,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
private long historicalDataExtractionTimeLowerBound; // Arrival time
+ private boolean sloppyPattern;
private boolean sloppyTimeRange; // true to disable time range filter after
extraction
private Pair<Boolean, Boolean> listeningOptionPair;
@@ -123,6 +131,27 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
throw new PipeParameterNotValidException(e.getMessage());
}
+ final Set<String> sloppyOptionSet =
+ Arrays.stream(
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY),
+ EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
+ .split(","))
+ .map(String::trim)
+ .map(String::toLowerCase)
+ .collect(Collectors.toSet());
+ // Avoid empty string
+ sloppyOptionSet.remove("");
+ sloppyTimeRange =
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
+ sloppyPattern =
sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
+ if (!sloppyOptionSet.isEmpty()) {
+ throw new PipeParameterNotValidException(
+ String.format(
+ "Parameters in set %s are not allowed in 'history.loose-range'",
sloppyOptionSet));
+ }
+
if (parameters.hasAnyAttributes(
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
@@ -280,19 +309,6 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
}
}
- sloppyTimeRange =
- Arrays.stream(
- parameters
- .getStringOrDefault(
- Arrays.asList(
- EXTRACTOR_HISTORY_LOOSE_RANGE_KEY,
SOURCE_HISTORY_LOOSE_RANGE_KEY),
- "")
- .split(","))
- .map(String::trim)
- .map(String::toLowerCase)
- .collect(Collectors.toSet())
- .contains("time");
-
shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
@@ -309,17 +325,20 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);
- LOGGER.info(
- "Pipe {}@{}: historical data extraction time range, start time {}({}),
end time {}({}), sloppy time range {}, should transfer mod file {}, should
terminate pipe on all historical events consumed {}",
- pipeName,
- dataRegionId,
- DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
- historicalDataExtractionStartTime,
- DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
- historicalDataExtractionEndTime,
- sloppyTimeRange,
- shouldTransferModFile,
- shouldTerminatePipeOnAllHistoricalEventsConsumed);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Pipe {}@{}: historical data extraction time range, start time
{}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should
transfer mod file {}, should terminate pipe on all historical events consumed
{}",
+ pipeName,
+ dataRegionId,
+ DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
+ historicalDataExtractionStartTime,
+ DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
+ historicalDataExtractionEndTime,
+ sloppyPattern,
+ sloppyTimeRange,
+ shouldTransferModFile,
+ shouldTerminatePipeOnAllHistoricalEventsConsumed);
+ }
}
private void flushDataRegionAllTsFiles() {
@@ -399,7 +418,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+ &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
+ &&
mayTsFileResourceOverlappedWithPattern(resource))
.collect(Collectors.toList());
resourceList.addAll(sequenceTsFileResources);
@@ -412,7 +432,8 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&&
isTsFileResourceOverlappedWithTimeRange(resource)
- &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+ &&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
+ &&
mayTsFileResourceOverlappedWithPattern(resource))
.collect(Collectors.toList());
resourceList.addAll(unsequenceTsFileResources);
@@ -474,6 +495,35 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
}
+ private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource
resource) {
+ if (!sloppyPattern) {
+ return true;
+ }
+
+ final Set<IDeviceID> deviceSet;
+ try {
+ final Map<IDeviceID, Boolean> deviceIsAlignedMap =
+ PipeResourceManager.tsfile()
+ .getDeviceIsAlignedMapFromCache(
+
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+ deviceSet =
+ Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() :
resource.getDevices();
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Pipe {}@{}: failed to get devices from TsFile {}, extract it
anyway",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath(),
+ e);
+ return true;
+ }
+
+ return deviceSet.stream()
+ .anyMatch(
+ // TODO: use IDeviceID
+ deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID)
deviceID).toStringID()));
+ }
+
private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource
resource) {
return !(resource.getFileEndTime() < historicalDataExtractionStartTime
|| historicalDataExtractionEndTime < resource.getFileStartTime());
@@ -530,7 +580,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
pipePattern,
historicalDataExtractionStartTime,
historicalDataExtractionEndTime);
- if (isDbNameCoveredByPattern) {
+ if (sloppyPattern || isDbNameCoveredByPattern) {
event.skipParsingPattern();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 0d064cd8b06..5e75e068fa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -71,7 +71,7 @@ public class PipeTsFileResourceManager {
} else {
LOGGER.warn("failed to try lock when checking TTL because of timeout
({}s)", timeout);
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("failed to try lock when checking TTL because of
interruption", e);
}
@@ -102,7 +102,7 @@ public class PipeTsFileResourceManager {
entry.getKey(),
entry.getValue().getReferenceCount()));
}
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
}
}
@@ -129,7 +129,8 @@ public class PipeTsFileResourceManager {
* @return the hardlink or copied file
* @throws IOException when create hardlink or copy file failed
*/
- public File increaseFileReference(File file, boolean isTsFile,
TsFileResource tsFileResource)
+ public File increaseFileReference(
+ final File file, final boolean isTsFile, final TsFileResource
tsFileResource)
throws IOException {
lock.lock();
try {
@@ -165,7 +166,7 @@ public class PipeTsFileResourceManager {
}
}
- private boolean increaseReferenceIfExists(String path) {
+ private boolean increaseReferenceIfExists(final String path) {
final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
if (resource != null) {
resource.increaseAndGetReference();
@@ -174,10 +175,10 @@ public class PipeTsFileResourceManager {
return false;
}
- private static File getHardlinkOrCopiedFileInPipeDir(File file) throws
IOException {
+ public static File getHardlinkOrCopiedFileInPipeDir(final File file) throws
IOException {
try {
return new File(getPipeTsFileDirPath(file), getRelativeFilePath(file));
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new IOException(
String.format(
"failed to get hardlink or copied file in pipe dir "
@@ -218,7 +219,7 @@ public class PipeTsFileResourceManager {
*
* @param hardlinkOrCopiedFile the copied or hardlinked file
*/
- public void decreaseFileReference(File hardlinkOrCopiedFile) {
+ public void decreaseFileReference(final File hardlinkOrCopiedFile) {
lock.lock();
try {
final String filePath = hardlinkOrCopiedFile.getPath();
@@ -237,7 +238,7 @@ public class PipeTsFileResourceManager {
* @param hardlinkOrCopiedFile the copied or hardlinked file
* @return the reference count of the file
*/
- public int getFileReferenceCount(File hardlinkOrCopiedFile) {
+ public int getFileReferenceCount(final File hardlinkOrCopiedFile) {
lock.lock();
try {
final String filePath = hardlinkOrCopiedFile.getPath();
@@ -254,7 +255,7 @@ public class PipeTsFileResourceManager {
* @return {@code true} if the maps are successfully put into cache or
already cached. {@code
* false} if they can not be cached.
*/
- public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws
IOException {
+ public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile)
throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
@@ -265,8 +266,8 @@ public class PipeTsFileResourceManager {
}
}
- public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(File
hardlinkOrCopiedTsFile)
- throws IOException {
+ public Map<IDeviceID, List<String>> getDeviceMeasurementsMapFromCache(
+ final File hardlinkOrCopiedTsFile) throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
@@ -277,7 +278,7 @@ public class PipeTsFileResourceManager {
}
}
- public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(File
hardlinkOrCopiedTsFile)
+ public Map<IDeviceID, Boolean> getDeviceIsAlignedMapFromCache(final File
hardlinkOrCopiedTsFile)
throws IOException {
lock.lock();
try {
@@ -289,8 +290,8 @@ public class PipeTsFileResourceManager {
}
}
- public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(File
hardlinkOrCopiedTsFile)
- throws IOException {
+ public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(
+ final File hardlinkOrCopiedTsFile) throws IOException {
lock.lock();
try {
final PipeTsFileResource resource =
@@ -301,7 +302,8 @@ public class PipeTsFileResourceManager {
}
}
- public void pinTsFileResource(TsFileResource resource, boolean withMods)
throws IOException {
+ public void pinTsFileResource(final TsFileResource resource, final boolean
withMods)
+ throws IOException {
lock.lock();
try {
increaseFileReference(resource.getTsFile(), true, resource);
@@ -313,13 +315,13 @@ public class PipeTsFileResourceManager {
}
}
- public void unpinTsFileResource(TsFileResource resource) throws IOException {
+ public void unpinTsFileResource(final TsFileResource resource) throws
IOException {
lock.lock();
try {
- File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
+ final File pinnedFile =
getHardlinkOrCopiedFileInPipeDir(resource.getTsFile());
decreaseFileReference(pinnedFile);
- File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
+ final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX);
if (modFile.exists()) {
decreaseFileReference(modFile);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index a67b594d361..99efcd48d61 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -64,6 +64,9 @@ public class PipeExtractorConstant {
public static final String SOURCE_HISTORY_END_TIME_KEY =
"source.history.end-time";
public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY =
"extractor.history.loose-range";
public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY =
"source.history.loose-range";
+ public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE = "time";
+ public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE = "path";
+ public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE = "";
public static final String EXTRACTOR_MODS_ENABLE_KEY =
"extractor.mods.enable";
public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;