This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 04b5ba4c63f Load & Region Migrate: Notify all DNs before and after RM 
(#15032) (#15131)
04b5ba4c63f is described below

commit 04b5ba4c63f31c0f2d2961e8dffdc83a83ec04ea
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Mar 25 10:30:42 2025 +0800

    Load & Region Migrate: Notify all DNs before and after RM (#15032) (#15131)
    
    (cherry picked from commit 50a48ce42b3c0a608fca49c326e364e3226f3a0d)
---
 .../client/async/CnToDnAsyncRequestType.java       |   1 +
 .../CnToDnInternalServiceAsyncRequestManager.java  |   6 +
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 +
 .../procedure/env/ConfigNodeProcedureEnv.java      |  15 +++
 .../region/NotifyRegionMigrationProcedure.java     | 137 +++++++++++++++++++++
 .../impl/region/RegionMigrateProcedure.java        |   2 +
 .../state/NotifyRegionMigrationState.java          |  24 ++++
 .../procedure/store/ProcedureFactory.java          |   7 ++
 .../confignode/procedure/store/ProcedureType.java  |   1 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 ++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  10 ++
 .../iotdb/db/service/RegionMigrateService.java     |  18 +++
 .../src/main/thrift/datanode.thrift                |  10 ++
 13 files changed, 239 insertions(+)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index e5d1515d27e..225d4341913 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -39,6 +39,7 @@ public enum CnToDnAsyncRequestType {
   CREATE_SCHEMA_REGION,
   DELETE_REGION,
   RESET_PEER_LIST,
+  NOTIFY_REGION_MIGRATION,
   UPDATE_REGION_ROUTE_MAP,
   CHANGE_REGION_LEADER,
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 1bae93c7399..59f99fe57cb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -71,6 +71,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -245,6 +246,11 @@ public class CnToDnInternalServiceAsyncRequestManager
         CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
         (req, client, handler) ->
             client.updateRegionCache((TRegionRouteReq) req, 
(DataNodeTSStatusRPCHandler) handler));
+    actionMapBuilder.put(
+        CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION,
+        (req, client, handler) ->
+            client.notifyRegionMigration(
+                (TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler) 
handler));
     actionMapBuilder.put(
         CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
         (req, client, handler) ->
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index dc8f14cfaa9..6e2a9dc97cc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,6 +202,7 @@ public abstract class 
DataNodeAsyncRequestRPCHandler<Response>
       case STOP_REPAIR_DATA:
       case LOAD_CONFIGURATION:
       case SET_SYSTEM_STATUS:
+      case NOTIFY_REGION_MIGRATION:
       case UPDATE_REGION_ROUTE_MAP:
       case INVALIDATE_SCHEMA_CACHE:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index b6f4fe35d9e..a21d2349ed9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -70,6 +70,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -522,6 +523,20 @@ public class ConfigNodeProcedureEnv {
     return req;
   }
 
+  public List<TSStatus> notifyRegionMigrationToAllDataNodes(
+      TConsensusGroupId consensusGroupId, boolean isStart) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TNotifyRegionMigrationReq request =
+        new TNotifyRegionMigrationReq(consensusGroupId, isStart);
+
+    final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus> 
clientHandler =
+        new DataNodeAsyncRequestContext<>(
+            CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request, 
dataNodeLocationMap);
+    
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+    return clientHandler.getResponseList();
+  }
+
   public void persistRegionGroup(CreateRegionGroupsPlan 
createRegionGroupsPlan) {
     // Persist the allocation result
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
new file mode 100644
index 00000000000..0160e9116f9
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/** A procedure that notifies all DNs of the ongoing region migration 
procedure. */
+public class NotifyRegionMigrationProcedure
+    extends RegionOperationProcedure<NotifyRegionMigrationState> {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class);
+
+  private boolean isStart;
+
+  public NotifyRegionMigrationProcedure() {
+    super();
+  }
+
+  public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId, 
boolean isStart) {
+    super(consensusGroupId);
+    this.isStart = isStart;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
NotifyRegionMigrationState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+    if (regionId == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      LOGGER.info(
+          "[pid{}][NotifyRegionMigration] started, region id is {}.", 
getProcId(), regionId);
+      env.notifyRegionMigrationToAllDataNodes(regionId, isStart);
+    } catch (Exception e) {
+      LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed", 
getProcId(), state, e);
+      return Flow.NO_MORE_STATE;
+    }
+    LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete", 
getProcId(), state);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(
+      ConfigNodeProcedureEnv configNodeProcedureEnv, 
NotifyRegionMigrationState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected NotifyRegionMigrationState getState(int stateId) {
+    return NotifyRegionMigrationState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(NotifyRegionMigrationState state) {
+    return state.ordinal();
+  }
+
+  @Override
+  protected NotifyRegionMigrationState getInitialState() {
+    return NotifyRegionMigrationState.INIT;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    
stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
+    stream.writeBoolean(isStart);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      regionId = 
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+      isStart = (byteBuffer.get() != (byte) 0);
+    } catch (ThriftSerDeException e) {
+      LOGGER.error("Error in deserialize {}", this.getClass(), e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof NotifyRegionMigrationProcedure)) {
+      return false;
+    }
+    NotifyRegionMigrationProcedure procedure = 
(NotifyRegionMigrationProcedure) obj;
+    return this.regionId.equals(procedure.regionId) && this.isStart == 
procedure.isStart;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(regionId, isStart);
+  }
+
+  @Override
+  public String toString() {
+    return "NotifyRegionMigrationProcedure{"
+        + "regionId="
+        + regionId
+        + ", isStart="
+        + isStart
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index f4d5dce26b6..1acdd2ccd54 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -83,6 +83,7 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               regionId,
               handler.simplifiedLocation(originalDataNode),
               handler.simplifiedLocation(destDataNode));
+          addChildProcedure(new NotifyRegionMigrationProcedure(regionId, 
true));
           setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
         case ADD_REGION_PEER:
@@ -124,6 +125,7 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               CommonDateTimeUtils.convertMillisecondToDurationStr(
                   System.currentTimeMillis() - getSubmittedTime()),
               DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
+          addChildProcedure(new NotifyRegionMigrationProcedure(regionId, 
false));
           return Flow.NO_MORE_STATE;
         default:
           throw new ProcedureException("Unsupported state: " + state.name());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
new file mode 100644
index 00000000000..1b964621e5b
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum NotifyRegionMigrationState {
+  INIT,
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index e5d43137fac..fc88b54f3d5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
 import 
org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
+import 
org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
@@ -120,6 +121,10 @@ public class ProcedureFactory implements IProcedureFactory 
{
         break;
       case RECONSTRUCT_REGION_PROCEDURE:
         procedure = new ReconstructRegionProcedure();
+        break;
+      case NOTIFY_REGION_MIGRATION_PROCEDURE:
+        procedure = new NotifyRegionMigrationProcedure();
+        break;
       case DELETE_TIMESERIES_PROCEDURE:
         procedure = new DeleteTimeSeriesProcedure(false);
         break;
@@ -318,6 +323,8 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
     } else if (procedure instanceof ReconstructRegionProcedure) {
       return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
+    } else if (procedure instanceof NotifyRegionMigrationProcedure) {
+      return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
     } else if (procedure instanceof CreateTriggerProcedure) {
       return ProcedureType.CREATE_TRIGGER_PROCEDURE;
     } else if (procedure instanceof DropTriggerProcedure) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 82b28186756..48ccca42d44 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -39,6 +39,7 @@ public enum ProcedureType {
   RECONSTRUCT_REGION_PROCEDURE((short) 203),
   ADD_REGION_PEER_PROCEDURE((short) 204),
   REMOVE_REGION_PEER_PROCEDURE((short) 205),
+  NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
   @TestOnly
   CREATE_MANY_DATABASES_PROCEDURE((short) 250),
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b90efed22ac..d0924477154 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -207,6 +207,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
@@ -2144,6 +2145,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
   }
 
+  @Override
+  public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws 
TException {
+    RegionMigrateService.getInstance().notifyRegionMigration(req);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   private TSStatus createNewRegion(ConsensusGroupId regionId, String 
storageGroup) {
     return regionManager.createNewRegion(regionId, storageGroup);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 04d38f25f89..0af4d91adfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -56,6 +56,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -182,6 +183,8 @@ public class LoadTsFileScheduler implements IScheduler {
           }
           shouldRemoveFileFromLoadingSet = true;
 
+          final long startTimeMs = System.currentTimeMillis();
+
           if (node.isTsFileEmpty()) {
             LOGGER.info("Load skip TsFile {}, because it has no data.", 
filePath);
           } else if (!node.needDecodeTsFile(
@@ -226,6 +229,13 @@ public class LoadTsFileScheduler implements IScheduler {
             }
           }
 
+          if (RegionMigrateService.getInstance().getLastNotifyTime() > 
startTimeMs) {
+            LOGGER.warn(
+                "LoadTsFileScheduler: Region migration started or ended during 
loading TsFile {}, will convert to insertion to avoid data loss",
+                filePath);
+            isLoadSingleTsFileSuccess = false;
+          }
+
           if (isLoadSingleTsFileSuccess) {
             node.clean();
             LOGGER.info(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 056df802c13..05328ea91b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
 import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class RegionMigrateService implements IService {
@@ -72,6 +74,9 @@ public class RegionMigrateService implements IService {
   // where different asynchronous tasks are submitted to the same datanode 
within a single procedure
   private static final ConcurrentHashMap<Long, TRegionMigrateResult> 
taskResultMap =
       new ConcurrentHashMap<>();
+
+  private static final AtomicLong lastNotifyTime = new 
AtomicLong(Long.MIN_VALUE);
+
   private static final TRegionMigrateResult unfinishedResult = new 
TRegionMigrateResult();
 
   private RegionMigrateService() {}
@@ -80,6 +85,19 @@ public class RegionMigrateService implements IService {
     return Holder.INSTANCE;
   }
 
+  public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
+    lastNotifyTime.set(System.currentTimeMillis());
+    if (req.isIsStart()) {
+      LOGGER.info("Region {} is notified to begin migrating", 
req.getRegionId());
+    } else {
+      LOGGER.info("Region {} is notified to finish migrating", 
req.getRegionId());
+    }
+  }
+
+  public long getLastNotifyTime() {
+    return lastNotifyTime.get();
+  }
+
   /**
    * Submit AddRegionPeerTask
    *
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4b25e08d1f6..bef96496650 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -52,6 +52,11 @@ struct TRegionMigrateResult {
   4: required common.TRegionMaintainTaskStatus taskStatus
 }
 
+struct TNotifyRegionMigrationReq {
+  1: required common.TConsensusGroupId regionId
+  2: required bool isStart
+}
+
 struct TCreatePeerReq {
   1: required common.TConsensusGroupId regionId
   2: required list<common.TDataNodeLocation> regionLocations
@@ -781,6 +786,11 @@ service IDataNodeRPCService {
    */
   TRegionMigrateResult getRegionMaintainResult(i64 taskId)
 
+    /**
+     * Notify the DataNode of the beginning or ending the migration of the 
specified RegionGroup
+     */
+    common.TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req)
+
   /**
    * Config node will clean DataNode cache, the Data node will not accept 
read/write request when disabled
    * @param data node location

Reply via email to