This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cdcd4c3be42 [IOTDB-5984] Pipe: remove unnecessary pipe task creation
logic on schema region (#10108)
cdcd4c3be42 is described below
commit cdcd4c3be42135f507b7a8ef435fb03ec6c2a521
Author: yschengzi <[email protected]>
AuthorDate: Sun Jun 11 02:06:32 2023 +0800
[IOTDB-5984] Pipe: remove unnecessary pipe task creation logic on schema
region (#10108)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/runtime/PipeRuntimeCoordinator.java | 23 +++++++++++-----------
.../impl/pipe/task/CreatePipeProcedureV2.java | 21 ++++++++++++--------
.../PipeHistoricalDataRegionTsFileCollector.java | 4 ++--
3 files changed, 27 insertions(+), 21 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 1382b90851e..00182aac69d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -70,17 +70,18 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
event
.getLeaderMap()
.forEach(
- (regionId, pair) -> {
- if (regionId.getType().equals(TConsensusGroupType.DataRegion)
- && !configManager
- .getPartitionManager()
- .getRegionStorageGroup(regionId)
- .equals(IoTDBMetricsUtils.DATABASE)) {
- // pipe only collect user's data, filter metric database here.
- dataRegionGroupToOldAndNewLeaderPairMap.put(
- regionId,
- new Pair<>( // null or -1 means empty origin leader
- pair.left == null ? -1 : pair.left, pair.right == null
? -1 : pair.right));
+ (regionGroupId, pair) -> {
+ if
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+ final String databaseName =
+
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
+ if (databaseName != null &&
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
+ // pipe only collect user's data, filter metric database
here.
+ dataRegionGroupToOldAndNewLeaderPairMap.put(
+ regionGroupId,
+ new Pair<>( // null or -1 means empty origin leader
+ pair.left == null ? -1 : pair.left,
+ pair.right == null ? -1 : pair.right));
+ }
}
});
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f8abb067054..746ca40b878 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -104,14 +105,18 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getLoadManager()
.getRegionLeaderMap()
.forEach(
- (regionGroup, regionLeaderNodeId) -> {
- if (!env.getConfigManager()
- .getPartitionManager()
- .getRegionStorageGroup(regionGroup)
- .equals(IoTDBMetricsUtils.DATABASE)) {
- // pipe only collect user's data, filter metric database here.
- consensusGroupIdToTaskMetaMap.put(
- regionGroup, new PipeTaskMeta(new MinimumProgressIndex(),
regionLeaderNodeId));
+ (regionGroupId, regionLeaderNodeId) -> {
+ if
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+ final String databaseName =
+ env.getConfigManager()
+ .getPartitionManager()
+ .getRegionStorageGroup(regionGroupId);
+ if (databaseName != null &&
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
+ // pipe only collect user's data, filter metric database
here.
+ consensusGroupIdToTaskMetaMap.put(
+ regionGroupId,
+ new PipeTaskMeta(new MinimumProgressIndex(),
regionLeaderNodeId));
+ }
}
});
pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 48b3a1c11ae..7d4bc25f2dc 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -213,8 +213,8 @@ public class PipeHistoricalDataRegionTsFileCollector
extends PipeHistoricalDataR
}
private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource
resource) {
- return historicalDataCollectionStartTime <= resource.getFileEndTime()
- || resource.getFileStartTime() <= historicalDataCollectionEndTime;
+ return !(resource.getFileEndTime() < historicalDataCollectionStartTime
+ || historicalDataCollectionEndTime < resource.getFileStartTime());
}
private boolean
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {