This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 81db737e325 Add lock for insertSeparatorToWAL (#14478)
81db737e325 is described below
commit 81db737e325d8cc4b1ecfd7339ff719fd314d5c2
Author: shuwenwei <[email protected]>
AuthorDate: Fri Dec 20 12:08:37 2024 +0800
Add lock for insertSeparatorToWAL (#14478)
* add lock for insertSeparatorToWAL
* add lock for visitDeleteData
---
.../dataregion/DataExecutionVisitor.java | 4 +++
.../deletion/persist/PageCacheDeletionBuffer.java | 2 +-
.../db/storageengine/dataregion/DataRegion.java | 33 ++++++++++++++++++----
.../wal/allocation/FirstCreateStrategy.java | 1 +
.../storageengine/dataregion/wal/node/WALNode.java | 8 +++++-
5 files changed, 40 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 3cc1acdbf76..c0fe1f3a14a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -242,6 +242,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
@Override
public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
+ dataRegion.writeLock("deleteData");
try {
for (MeasurementPath path : node.getPathList()) {
MeasurementPath databaseToDelete =
@@ -262,6 +263,8 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
} catch (IOException | IllegalPathException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return new TSStatus(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+ } finally {
+ dataRegion.writeUnlock();
}
}
@@ -269,6 +272,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitDeleteData(RelationalDeleteDataNode node, DataRegion
dataRegion) {
try {
dataRegion.deleteByTable(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (IOException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index dcd52086425..1e33793f3b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -152,7 +152,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
private void allocateBuffers() {
try {
- serializeBuffer = ByteBuffer.allocateDirect(ONE_THIRD_WAL_BUFFER_SIZE);
+ serializeBuffer = ByteBuffer.allocate(ONE_THIRD_WAL_BUFFER_SIZE);
} catch (OutOfMemoryError e) {
LOGGER.error(
"Fail to allocate deletionBuffer-group-{}'s buffer because out of
memory.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9cfe7e324b3..ff8eda308d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2204,6 +2204,9 @@ public class DataRegion implements IDataRegionForQuery {
boolean hasReleasedLock = false;
try {
+ if (deleted) {
+ return;
+ }
TreeDeviceSchemaCacheManager.getInstance().invalidateLastCache(pattern);
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
@@ -2254,6 +2257,9 @@ public class DataRegion implements IDataRegionForQuery {
writeLock("delete");
boolean hasReleasedLock = false;
try {
+ if (deleted) {
+ return;
+ }
TableDeviceSchemaCache.getInstance()
.invalidateLastCache(getDatabaseName(),
modEntries.get(0).getTableName());
List<WALFlushListener> walListeners = logDeletionInWAL(node);
@@ -2316,6 +2322,9 @@ public class DataRegion implements IDataRegionForQuery {
boolean releasedLock = false;
try {
+ if (deleted) {
+ return;
+ }
TreeDeviceSchemaCacheManager.getInstance().invalidateDatabaseLastCache(getDatabaseName());
// write log to impacted working TsFileProcessors
List<WALFlushListener> walListeners =
@@ -2421,12 +2430,20 @@ public class DataRegion implements IDataRegionForQuery {
* request</a> for details.
*/
public void insertSeparatorToWAL() {
- getWALNode()
- .ifPresent(
- walNode ->
- walNode.log(
- TsFileProcessor.MEMTABLE_NOT_EXIST,
- new ContinuousSameSearchIndexSeparatorNode()));
+ writeLock("insertSeparatorToWAL");
+ try {
+ if (deleted) {
+ return;
+ }
+ getWALNode()
+ .ifPresent(
+ walNode ->
+ walNode.log(
+ TsFileProcessor.MEMTABLE_NOT_EXIST,
+ new ContinuousSameSearchIndexSeparatorNode()));
+ } finally {
+ writeUnlock();
+ }
}
private boolean canSkipDelete(TsFileResource tsFileResource, ModEntry
deletion) {
@@ -3671,6 +3688,10 @@ public class DataRegion implements IDataRegionForQuery {
return insertWriteLockHolder;
}
+ public boolean isDeleted() {
+ return deleted;
+ }
+
/** This method could only be used in iot consensus */
public Optional<IWALNode> getWALNode() {
if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
index 42064a3294b..219666773e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/FirstCreateStrategy.java
@@ -95,6 +95,7 @@ public class FirstCreateStrategy extends
AbstractNodeAllocationStrategy {
try {
WALNode walNode = identifier2Nodes.remove(applicantUniqueId);
if (walNode != null) {
+ walNode.setDeleted(true);
walNode.close();
if (walNode.getLogDirectory().exists()) {
FileUtils.deleteFileOrDirectory(walNode.getLogDirectory());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 7028872fcad..5d88c0915b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -112,6 +112,8 @@ public class WALNode implements IWALNode {
// insert nodes whose search index are before this value can be deleted
safely
private volatile long safelyDeletedSearchIndex =
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
+ private volatile boolean deleted = false;
+
public WALNode(String identifier, String logDirectory) throws IOException {
this(identifier, logDirectory, 0, 0L);
}
@@ -233,6 +235,10 @@ public class WALNode implements IWALNode {
buffer.write(new WALInfoEntry(memTable.getMemTableId(), checkpoint));
}
+ public void setDeleted(boolean deleted) {
+ this.deleted = deleted;
+ }
+
// region methods for pipe
/**
@@ -960,7 +966,7 @@ public class WALNode implements IWALNode {
public void rollWALFile() {
WALEntry rollWALFileSignal = new
WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener walFlushListener = log(rollWALFileSignal);
- if (walFlushListener.waitForResult() ==
AbstractResultListener.Status.FAILURE) {
+ if (!deleted && walFlushListener.waitForResult() ==
AbstractResultListener.Status.FAILURE) {
logger.error(
"Fail to trigger rolling wal node-{}'s wal file log writer.",
identifier,