This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 911d7b673a6 Pipe: Fixed the bug that schema region listening queue is
not cleared when the schema region is deleted / migrated (#12575)
911d7b673a6 is described below
commit 911d7b673a6ea2c28063a1ba1872b9ce24029672
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 23 17:18:30 2024 +0800
Pipe: Fixed the bug that schema region listening queue is not cleared when
the schema region is deleted / migrated (#12575)
---
.../schemaregion/SchemaExecutionVisitor.java | 25 +++++++++-------------
.../thrift/impl/DataNodeRegionManager.java | 2 ++
.../listening/AbstractPipeListeningQueue.java | 11 +++++-----
.../AbstractSerializableListeningQueue.java | 25 +++++++++++-----------
4 files changed, 30 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index d8b4ff691de..e82ed922045 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -527,34 +527,29 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
@Override
public TSStatus visitPipeEnrichedWritePlanNode(
- PipeEnrichedWritePlanNode node, ISchemaRegion schemaRegion) {
+ final PipeEnrichedWritePlanNode node, final ISchemaRegion schemaRegion) {
return node.getWritePlanNode().accept(this, schemaRegion);
}
@Override
public TSStatus visitPipeEnrichedNonWritePlanNode(
- PipeEnrichedNonWritePlanNode node, ISchemaRegion schemaRegion) {
+ final PipeEnrichedNonWritePlanNode node, final ISchemaRegion
schemaRegion) {
return node.getNonWritePlanNode().accept(this, schemaRegion);
}
@Override
public TSStatus visitPipeOperateSchemaQueueNode(
- PipeOperateSchemaQueueNode node, ISchemaRegion schemaRegion) {
+ final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion)
{
final SchemaRegionId id = schemaRegion.getSchemaRegionId();
final SchemaRegionListeningQueue queue =
PipeAgent.runtime().schemaListener(id);
- try {
- if (node.isOpen() && !queue.isOpened()) {
- logger.info("Opened pipe listening queue on schema region {}", id);
- queue.open();
- } else if (!node.isOpen() && queue.isOpened()) {
- logger.info("Closed pipe listening queue on schema region {}", id);
- queue.close();
- }
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (IOException e) {
- return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
- .setMessage("Failed to clear the queue, because " + e.getMessage());
+ if (node.isOpen() && !queue.isOpened()) {
+ logger.info("Opened pipe listening queue on schema region {}", id);
+ queue.open();
+ } else if (!node.isOpen() && queue.isOpened()) {
+ logger.info("Closed pipe listening queue on schema region {}", id);
+ queue.close();
}
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index 530ce68e997..c7b0d378065 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.rpc.RpcUtils;
@@ -203,6 +204,7 @@ public class DataNodeRegionManager {
public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) {
try {
schemaEngine.deleteSchemaRegion(schemaRegionId);
+ PipeAgent.runtime().schemaListener(schemaRegionId).close();
schemaRegionLockMap.remove(schemaRegionId);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
} catch (MetadataException e) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index 5e182d4094a..403930be960 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -30,7 +30,6 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -57,7 +56,7 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
/////////////////////////////// Plan ///////////////////////////////
- protected synchronized void tryListen(EnrichedEvent event) {
+ protected synchronized void tryListen(final EnrichedEvent event) {
if (super.tryListen(event)) {
event.increaseReferenceCount(AbstractPipeListeningQueue.class.getName());
}
@@ -65,7 +64,7 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
/////////////////////////////// Snapshot Cache
///////////////////////////////
- protected synchronized void tryListen(List<PipeSnapshotEvent> events) {
+ protected synchronized void tryListen(final List<PipeSnapshotEvent> events) {
if (!isClosed.get()) {
clearSnapshots();
queueTailIndex2SnapshotsCache.setLeft(queue.getTailIndex());
@@ -87,7 +86,7 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
}
@Override
- public synchronized long removeBefore(long newFirstIndex) {
+ public synchronized long removeBefore(final long newFirstIndex) {
final long result = super.removeBefore(newFirstIndex);
if (queueTailIndex2SnapshotsCache.getLeft() < result) {
clearSnapshots();
@@ -108,13 +107,13 @@ public abstract class AbstractPipeListeningQueue extends
AbstractSerializableLis
/////////////////////////////// Close ///////////////////////////////
@Override
- public synchronized void close() throws IOException {
+ public synchronized void close() {
clearSnapshots();
super.close();
}
@Override
- protected void releaseResource(Event event) {
+ protected void releaseResource(final Event event) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event)
.decreaseReferenceCount(AbstractPipeListeningQueue.class.getName(),
false);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
index 10482fd4019..afa62aaf61e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
@@ -58,14 +58,14 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
protected final AtomicBoolean isClosed = new AtomicBoolean(true);
- protected AbstractSerializableListeningQueue(QueueSerializerType
serializerType) {
+ protected AbstractSerializableListeningQueue(final QueueSerializerType
serializerType) {
this.serializerType = serializerType;
serializers.put(QueueSerializerType.PLAIN, PlainQueueSerializer::new);
}
/////////////////////////////// Function ///////////////////////////////
- protected synchronized boolean tryListen(E element) {
+ protected synchronized boolean tryListen(final E element) {
if (isClosed.get()) {
return false;
}
@@ -74,7 +74,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
}
// Caller should ensure that the "newFirstIndex" is less than every
iterators.
- public synchronized long removeBefore(long newFirstIndex) {
+ public synchronized long removeBefore(final long newFirstIndex) {
try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
queue.iterateFromEarliest()) {
while (iterator.getNextIndex() < newFirstIndex) {
@@ -88,23 +88,24 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
return queue.tryRemoveBefore(newFirstIndex);
}
- public synchronized boolean isGivenNextIndexValid(long nextIndex) {
+ public synchronized boolean isGivenNextIndexValid(final long nextIndex) {
// The "tailIndex" is permitted to listen to the next incoming element
return queue.isNextIndexValid(nextIndex);
}
- public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator
newIterator(long nextIndex) {
+ public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator
newIterator(
+ final long nextIndex) {
return queue.iterateFrom(nextIndex);
}
public synchronized void returnIterator(
- ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
+ final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
iterator.close();
}
/////////////////////////////// Snapshot ///////////////////////////////
- public synchronized boolean serializeToFile(File snapshotName) throws
IOException {
+ public synchronized boolean serializeToFile(final File snapshotName) throws
IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (snapshotFile.exists() && snapshotFile.isFile()) {
LOGGER.error(
@@ -128,7 +129,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
}
}
- public synchronized void deserializeFromFile(File snapshotName) throws
IOException {
+ public synchronized void deserializeFromFile(final File snapshotName) throws
IOException {
final File snapshotFile = new File(String.valueOf(snapshotName));
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
@@ -155,7 +156,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
/////////////////////////////// Element Ser / De Method
////////////////////////////////
- protected abstract ByteBuffer serializeToByteBuffer(E element);
+ protected abstract ByteBuffer serializeToByteBuffer(final E element);
/**
* Deserialize a single element from byteBuffer.
@@ -163,7 +164,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
* @param byteBuffer the byteBuffer corresponding to an element
* @return The deserialized element or {@code null} if a failure is
encountered.
*/
- protected abstract E deserializeFromByteBuffer(ByteBuffer byteBuffer);
+ protected abstract E deserializeFromByteBuffer(final ByteBuffer byteBuffer);
/////////////////////////////// Open & Close ///////////////////////////////
@@ -172,7 +173,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
}
@Override
- public synchronized void close() throws IOException {
+ public synchronized void close() {
isClosed.set(true);
try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
@@ -188,7 +189,7 @@ public abstract class AbstractSerializableListeningQueue<E>
implements Closeable
queue.clear();
}
- protected abstract void releaseResource(E element);
+ protected abstract void releaseResource(final E element);
public synchronized boolean isOpened() {
return !isClosed.get();