This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5984-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d643d903871ff63381399b5c68ec40031f968085 Author: yschengzi <[email protected]> AuthorDate: Sun Jun 11 02:06:32 2023 +0800 [IOTDB-5984] Pipe: remove unnecessary pipe task creation logic on schema region (#10108) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit cdcd4c3be42135f507b7a8ef435fb03ec6c2a521) --- .../pipe/runtime/PipeRuntimeCoordinator.java | 23 +++++++++++----------- .../impl/pipe/task/CreatePipeProcedureV2.java | 21 ++++++++++++-------- .../PipeHistoricalDataRegionTsFileCollector.java | 4 ++-- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java index 1382b90851e..00182aac69d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java @@ -70,17 +70,18 @@ public class PipeRuntimeCoordinator implements IClusterStatusSubscriber { event .getLeaderMap() .forEach( - (regionId, pair) -> { - if (regionId.getType().equals(TConsensusGroupType.DataRegion) - && !configManager - .getPartitionManager() - .getRegionStorageGroup(regionId) - .equals(IoTDBMetricsUtils.DATABASE)) { - // pipe only collect user's data, filter metric database here. - dataRegionGroupToOldAndNewLeaderPairMap.put( - regionId, - new Pair<>( // null or -1 means empty origin leader - pair.left == null ? -1 : pair.left, pair.right == null ? -1 : pair.right)); + (regionGroupId, pair) -> { + if (regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) { + final String databaseName = + configManager.getPartitionManager().getRegionStorageGroup(regionGroupId); + if (databaseName != null && !databaseName.equals(IoTDBMetricsUtils.DATABASE)) { + // pipe only collect user's data, filter metric database here. + dataRegionGroupToOldAndNewLeaderPairMap.put( + regionGroupId, + new Pair<>( // null or -1 means empty origin leader + pair.left == null ? -1 : pair.left, + pair.right == null ? -1 : pair.right)); + } } }); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index f8abb067054..746ca40b878 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta; @@ -104,14 +105,18 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { .getLoadManager() .getRegionLeaderMap() .forEach( - (regionGroup, regionLeaderNodeId) -> { - if (!env.getConfigManager() - .getPartitionManager() - .getRegionStorageGroup(regionGroup) - .equals(IoTDBMetricsUtils.DATABASE)) { - // pipe only collect user's data, filter metric database here. - consensusGroupIdToTaskMetaMap.put( - regionGroup, new PipeTaskMeta(new MinimumProgressIndex(), regionLeaderNodeId)); + (regionGroupId, regionLeaderNodeId) -> { + if (regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) { + final String databaseName = + env.getConfigManager() + .getPartitionManager() + .getRegionStorageGroup(regionGroupId); + if (databaseName != null && !databaseName.equals(IoTDBMetricsUtils.DATABASE)) { + // pipe only collect user's data, filter metric database here. + consensusGroupIdToTaskMetaMap.put( + regionGroupId, + new PipeTaskMeta(new MinimumProgressIndex(), regionLeaderNodeId)); + } } }); pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java index f39923fd67a..10b1247620e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java @@ -204,8 +204,8 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR } private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource resource) { - return historicalDataCollectionStartTime <= resource.getFileEndTime() - || resource.getFileStartTime() <= historicalDataCollectionEndTime; + return !(resource.getFileEndTime() < historicalDataCollectionStartTime + || historicalDataCollectionEndTime < resource.getFileStartTime()); } private boolean isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {
