This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cdcd4c3be42 [IOTDB-5984] Pipe: remove unnecessary pipe task creation 
logic on schema region (#10108)
cdcd4c3be42 is described below

commit cdcd4c3be42135f507b7a8ef435fb03ec6c2a521
Author: yschengzi <[email protected]>
AuthorDate: Sun Jun 11 02:06:32 2023 +0800

    [IOTDB-5984] Pipe: remove unnecessary pipe task creation logic on schema 
region (#10108)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/runtime/PipeRuntimeCoordinator.java       | 23 +++++++++++-----------
 .../impl/pipe/task/CreatePipeProcedureV2.java      | 21 ++++++++++++--------
 .../PipeHistoricalDataRegionTsFileCollector.java   |  4 ++--
 3 files changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 1382b90851e..00182aac69d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -70,17 +70,18 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
     event
         .getLeaderMap()
         .forEach(
-            (regionId, pair) -> {
-              if (regionId.getType().equals(TConsensusGroupType.DataRegion)
-                  && !configManager
-                      .getPartitionManager()
-                      .getRegionStorageGroup(regionId)
-                      .equals(IoTDBMetricsUtils.DATABASE)) {
-                // pipe only collect user's data, filter metric database here.
-                dataRegionGroupToOldAndNewLeaderPairMap.put(
-                    regionId,
-                    new Pair<>( // null or -1 means empty origin leader
-                        pair.left == null ? -1 : pair.left, pair.right == null 
? -1 : pair.right));
+            (regionGroupId, pair) -> {
+              if 
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+                final String databaseName =
+                    
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
+                if (databaseName != null && 
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
+                  // pipe only collect user's data, filter metric database 
here.
+                  dataRegionGroupToOldAndNewLeaderPairMap.put(
+                      regionGroupId,
+                      new Pair<>( // null or -1 means empty origin leader
+                          pair.left == null ? -1 : pair.left,
+                          pair.right == null ? -1 : pair.right));
+                }
               }
             });
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index f8abb067054..746ca40b878 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -104,14 +105,18 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .getLoadManager()
         .getRegionLeaderMap()
         .forEach(
-            (regionGroup, regionLeaderNodeId) -> {
-              if (!env.getConfigManager()
-                  .getPartitionManager()
-                  .getRegionStorageGroup(regionGroup)
-                  .equals(IoTDBMetricsUtils.DATABASE)) {
-                // pipe only collect user's data, filter metric database here.
-                consensusGroupIdToTaskMetaMap.put(
-                    regionGroup, new PipeTaskMeta(new MinimumProgressIndex(), 
regionLeaderNodeId));
+            (regionGroupId, regionLeaderNodeId) -> {
+              if 
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
+                final String databaseName =
+                    env.getConfigManager()
+                        .getPartitionManager()
+                        .getRegionStorageGroup(regionGroupId);
+                if (databaseName != null && 
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
+                  // pipe only collect user's data, filter metric database 
here.
+                  consensusGroupIdToTaskMetaMap.put(
+                      regionGroupId,
+                      new PipeTaskMeta(new MinimumProgressIndex(), 
regionLeaderNodeId));
+                }
               }
             });
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 48b3a1c11ae..7d4bc25f2dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -213,8 +213,8 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
   }
 
   private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
-    return historicalDataCollectionStartTime <= resource.getFileEndTime()
-        || resource.getFileStartTime() <= historicalDataCollectionEndTime;
+    return !(resource.getFileEndTime() < historicalDataCollectionStartTime
+        || historicalDataCollectionEndTime < resource.getFileStartTime());
   }
 
   private boolean 
isTsFileGeneratedAfterCollectionTimeLowerBound(TsFileResource resource) {

Reply via email to