This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d82dffdbc8e IoTConsensusV2: Fix npe when data region is not existed
(#14180)
d82dffdbc8e is described below
commit d82dffdbc8ece5589ccf9154c7fdedb37fbfde9f
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Nov 22 22:05:49 2024 +0800
IoTConsensusV2: Fix npe when data region is not existed (#14180)
---
.../pipeconsensus/PipeConsensusReceiver.java | 34 +++++++++++++++-------
1 file changed, 24 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 13cfdbbdf49..1e1ae84f3a2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -88,6 +88,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils.generateTsFileResource;
+
public class PipeConsensusReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusReceiver.class);
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
@@ -653,9 +655,19 @@ public class PipeConsensusReceiver {
private TSStatus loadFileToDataRegion(String filePath, ProgressIndex
progressIndex)
throws IOException, LoadFileException {
- StorageEngine.getInstance()
- .getDataRegion(((DataRegionId) consensusGroupId))
- .loadNewTsFile(generateTsFileResource(filePath, progressIndex), true,
false);
+ DataRegion region =
+ StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId));
+ if (region != null) {
+ TsFileResource resource = generateTsFileResource(filePath,
progressIndex);
+ region.loadNewTsFile(resource, true, false);
+ } else {
+ // Data region is null indicates that dr has been removed or migrated.
In those cases, there
+ // is no need to replicate data. we just return success to avoid leader
keeping retry
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: skip load tsfile-{} when sealing,
because this region has been removed or migrated.",
+ consensusPipeName,
+ filePath);
+ }
return RpcUtils.SUCCESS_STATUS;
}
@@ -689,14 +701,16 @@ public class PipeConsensusReceiver {
}
private void updateWritePointCountMetrics(long writePointCount) {
- final DataRegion dataRegion =
- StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId));
- dataRegion
- .getNonSystemDatabaseName()
+ Optional.ofNullable(
+ StorageEngine.getInstance().getDataRegion(((DataRegionId)
consensusGroupId)))
.ifPresent(
- databaseName ->
- LoadTsFileManager.updateWritePointCountMetrics(
- dataRegion, databaseName, writePointCount, true));
+ dataRegion ->
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName ->
+ LoadTsFileManager.updateWritePointCountMetrics(
+ dataRegion, databaseName, writePointCount,
true)));
}
private TsFileResource generateTsFileResource(String filePath, ProgressIndex
progressIndex)