This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 81db737e325 Add lock for insertSeparatorToWAL (#14478)
81db737e325 is described below

commit 81db737e325d8cc4b1ecfd7339ff719fd314d5c2
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 20 12:08:37 2024 +0800

    Add lock for insertSeparatorToWAL (#14478)
    
    * add lock for insertSeparatorToWAL
    
    * add lock for visitDeleteData
---
 .../dataregion/DataExecutionVisitor.java           |  4 +++
 .../deletion/persist/PageCacheDeletionBuffer.java  |  2 +-
 .../db/storageengine/dataregion/DataRegion.java    | 33 ++++++++++++++++++----
 .../wal/allocation/FirstCreateStrategy.java        |  1 +
 .../storageengine/dataregion/wal/node/WALNode.java |  8 +++++-
 5 files changed, 40 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 3cc1acdbf76..c0fe1f3a14a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -242,6 +242,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
 
   @Override
   public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
+    dataRegion.writeLock("deleteData");
     try {
       for (MeasurementPath path : node.getPathList()) {
         MeasurementPath databaseToDelete =
@@ -262,6 +263,8 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
     } catch (IOException | IllegalPathException e) {
       LOGGER.error("Error in executing plan node: {}", node, e);
       return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+    } finally {
+      dataRegion.writeUnlock();
     }
   }
 
@@ -269,6 +272,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitDeleteData(RelationalDeleteDataNode node, DataRegion 
dataRegion) {
     try {
       dataRegion.deleteByTable(node);
+      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (IOException e) {
       LOGGER.error("Error in executing plan node: {}", node, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index dcd52086425..1e33793f3b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -152,7 +152,7 @@ public class PageCacheDeletionBuffer implements 
DeletionBuffer {
 
   private void allocateBuffers() {
     try {
-      serializeBuffer = ByteBuffer.allocateDirect(ONE_THIRD_WAL_BUFFER_SIZE);
+      serializeBuffer = ByteBuffer.allocate(ONE_THIRD_WAL_BUFFER_SIZE);
     } catch (OutOfMemoryError e) {
       LOGGER.error(
           "Fail to allocate deletionBuffer-group-{}'s buffer because out of 
memory.",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9cfe7e324b3..ff8eda308d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2204,6 +2204,9 @@ public class DataRegion implements IDataRegionForQuery {
     boolean hasReleasedLock = false;
 
     try {
+      if (deleted) {
+        return;
+      }
       TreeDeviceSchemaCacheManager.getInstance().invalidateLastCache(pattern);
       // write log to impacted working TsFileProcessors
       List<WALFlushListener> walListeners =
@@ -2254,6 +2257,9 @@ public class DataRegion implements IDataRegionForQuery {
     writeLock("delete");
     boolean hasReleasedLock = false;
     try {
+      if (deleted) {
+        return;
+      }
       TableDeviceSchemaCache.getInstance()
           .invalidateLastCache(getDatabaseName(), 
modEntries.get(0).getTableName());
       List<WALFlushListener> walListeners = logDeletionInWAL(node);
@@ -2316,6 +2322,9 @@ public class DataRegion implements IDataRegionForQuery {
     boolean releasedLock = false;
 
     try {
+      if (deleted) {
+        return;
+      }
       
TreeDeviceSchemaCacheManager.getInstance().invalidateDatabaseLastCache(getDatabaseName());
       // write log to impacted working TsFileProcessors
       List<WALFlushListener> walListeners =
@@ -2421,12 +2430,20 @@ public class DataRegion implements IDataRegionForQuery {
    * request</a> for details.
    */
   public void insertSeparatorToWAL() {
-    getWALNode()
-        .ifPresent(
-            walNode ->
-                walNode.log(
-                    TsFileProcessor.MEMTABLE_NOT_EXIST,
-                    new ContinuousSameSearchIndexSeparatorNode()));
+    writeLock("insertSeparatorToWAL");
+    try {
+      if (deleted) {
+        return;
+      }
+      getWALNode()
+          .ifPresent(
+              walNode ->
+                  walNode.log(
+                      TsFileProcessor.MEMTABLE_NOT_EXIST,
+                      new ContinuousSameSearchIndexSeparatorNode()));
+    } finally {
+      writeUnlock();
+    }
   }
 
   private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry 
deletion) {
@@ -3671,6 +3688,10 @@ public class DataRegion implements IDataRegionForQuery {
     return insertWriteLockHolder;
   }
 
+  public boolean isDeleted() {
+    return deleted;
+  }
+
   /** This method could only be used in iot consensus */
   public Optional<IWALNode> getWALNode() {
     if 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
index 42064a3294b..219666773e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
@@ -95,6 +95,7 @@ public class FirstCreateStrategy extends 
AbstractNodeAllocationStrategy {
     try {
       WALNode walNode = identifier2Nodes.remove(applicantUniqueId);
       if (walNode != null) {
+        walNode.setDeleted(true);
         walNode.close();
         if (walNode.getLogDirectory().exists()) {
           FileUtils.deleteFileOrDirectory(walNode.getLogDirectory());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 7028872fcad..5d88c0915b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -112,6 +112,8 @@ public class WALNode implements IWALNode {
   // insert nodes whose search index are before this value can be deleted 
safely
   private volatile long safelyDeletedSearchIndex = 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
+  private volatile boolean deleted = false;
+
   public WALNode(String identifier, String logDirectory) throws IOException {
     this(identifier, logDirectory, 0, 0L);
   }
@@ -233,6 +235,10 @@ public class WALNode implements IWALNode {
     buffer.write(new WALInfoEntry(memTable.getMemTableId(), checkpoint));
   }
 
+  public void setDeleted(boolean deleted) {
+    this.deleted = deleted;
+  }
+
   // region methods for pipe
 
   /**
@@ -960,7 +966,7 @@ public class WALNode implements IWALNode {
   public void rollWALFile() {
     WALEntry rollWALFileSignal = new 
WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
     WALFlushListener walFlushListener = log(rollWALFileSignal);
-    if (walFlushListener.waitForResult() == 
AbstractResultListener.Status.FAILURE) {
+    if (!deleted && walFlushListener.waitForResult() == 
AbstractResultListener.Status.FAILURE) {
       logger.error(
           "Fail to trigger rolling wal node-{}'s wal file log writer.",
           identifier,

Reply via email to