This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 911d7b673a6 Pipe: Fixed the bug that schema region listening queue is 
not cleared when the schema region is deleted / migrated (#12575)
911d7b673a6 is described below

commit 911d7b673a6ea2c28063a1ba1872b9ce24029672
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 23 17:18:30 2024 +0800

    Pipe: Fixed the bug that schema region listening queue is not cleared when 
the schema region is deleted / migrated (#12575)
---
 .../schemaregion/SchemaExecutionVisitor.java       | 25 +++++++++-------------
 .../thrift/impl/DataNodeRegionManager.java         |  2 ++
 .../listening/AbstractPipeListeningQueue.java      | 11 +++++-----
 .../AbstractSerializableListeningQueue.java        | 25 +++++++++++-----------
 4 files changed, 30 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index d8b4ff691de..e82ed922045 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -527,34 +527,29 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
 
   @Override
   public TSStatus visitPipeEnrichedWritePlanNode(
-      PipeEnrichedWritePlanNode node, ISchemaRegion schemaRegion) {
+      final PipeEnrichedWritePlanNode node, final ISchemaRegion schemaRegion) {
     return node.getWritePlanNode().accept(this, schemaRegion);
   }
 
   @Override
   public TSStatus visitPipeEnrichedNonWritePlanNode(
-      PipeEnrichedNonWritePlanNode node, ISchemaRegion schemaRegion) {
+      final PipeEnrichedNonWritePlanNode node, final ISchemaRegion 
schemaRegion) {
     return node.getNonWritePlanNode().accept(this, schemaRegion);
   }
 
   @Override
   public TSStatus visitPipeOperateSchemaQueueNode(
-      PipeOperateSchemaQueueNode node, ISchemaRegion schemaRegion) {
+      final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) 
{
     final SchemaRegionId id = schemaRegion.getSchemaRegionId();
     final SchemaRegionListeningQueue queue = 
PipeAgent.runtime().schemaListener(id);
-    try {
-      if (node.isOpen() && !queue.isOpened()) {
-        logger.info("Opened pipe listening queue on schema region {}", id);
-        queue.open();
-      } else if (!node.isOpen() && queue.isOpened()) {
-        logger.info("Closed pipe listening queue on schema region {}", id);
-        queue.close();
-      }
-      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (IOException e) {
-      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
-          .setMessage("Failed to clear the queue, because " + e.getMessage());
+    if (node.isOpen() && !queue.isOpened()) {
+      logger.info("Opened pipe listening queue on schema region {}", id);
+      queue.open();
+    } else if (!node.isOpen() && queue.isOpened()) {
+      logger.info("Closed pipe listening queue on schema region {}", id);
+      queue.close();
     }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index 530ce68e997..c7b0d378065 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -203,6 +204,7 @@ public class DataNodeRegionManager {
   public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) {
     try {
       schemaEngine.deleteSchemaRegion(schemaRegionId);
+      PipeAgent.runtime().schemaListener(schemaRegionId).close();
       schemaRegionLockMap.remove(schemaRegionId);
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully");
     } catch (MetadataException e) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
index 5e182d4094a..403930be960 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java
@@ -30,7 +30,6 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -57,7 +56,7 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
 
   /////////////////////////////// Plan ///////////////////////////////
 
-  protected synchronized void tryListen(EnrichedEvent event) {
+  protected synchronized void tryListen(final EnrichedEvent event) {
     if (super.tryListen(event)) {
       event.increaseReferenceCount(AbstractPipeListeningQueue.class.getName());
     }
@@ -65,7 +64,7 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
 
   /////////////////////////////// Snapshot Cache 
///////////////////////////////
 
-  protected synchronized void tryListen(List<PipeSnapshotEvent> events) {
+  protected synchronized void tryListen(final List<PipeSnapshotEvent> events) {
     if (!isClosed.get()) {
       clearSnapshots();
       queueTailIndex2SnapshotsCache.setLeft(queue.getTailIndex());
@@ -87,7 +86,7 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
   }
 
   @Override
-  public synchronized long removeBefore(long newFirstIndex) {
+  public synchronized long removeBefore(final long newFirstIndex) {
     final long result = super.removeBefore(newFirstIndex);
     if (queueTailIndex2SnapshotsCache.getLeft() < result) {
       clearSnapshots();
@@ -108,13 +107,13 @@ public abstract class AbstractPipeListeningQueue extends 
AbstractSerializableLis
   /////////////////////////////// Close ///////////////////////////////
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close() {
     clearSnapshots();
     super.close();
   }
 
   @Override
-  protected void releaseResource(Event event) {
+  protected void releaseResource(final Event event) {
     if (event instanceof EnrichedEvent) {
       ((EnrichedEvent) event)
           .decreaseReferenceCount(AbstractPipeListeningQueue.class.getName(), 
false);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
index 10482fd4019..afa62aaf61e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractSerializableListeningQueue.java
@@ -58,14 +58,14 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(true);
 
-  protected AbstractSerializableListeningQueue(QueueSerializerType 
serializerType) {
+  protected AbstractSerializableListeningQueue(final QueueSerializerType 
serializerType) {
     this.serializerType = serializerType;
     serializers.put(QueueSerializerType.PLAIN, PlainQueueSerializer::new);
   }
 
   /////////////////////////////// Function ///////////////////////////////
 
-  protected synchronized boolean tryListen(E element) {
+  protected synchronized boolean tryListen(final E element) {
     if (isClosed.get()) {
       return false;
     }
@@ -74,7 +74,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
   }
 
   // Caller should ensure that the "newFirstIndex" is less than every 
iterators.
-  public synchronized long removeBefore(long newFirstIndex) {
+  public synchronized long removeBefore(final long newFirstIndex) {
     try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
         queue.iterateFromEarliest()) {
       while (iterator.getNextIndex() < newFirstIndex) {
@@ -88,23 +88,24 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
     return queue.tryRemoveBefore(newFirstIndex);
   }
 
-  public synchronized boolean isGivenNextIndexValid(long nextIndex) {
+  public synchronized boolean isGivenNextIndexValid(final long nextIndex) {
     // The "tailIndex" is permitted to listen to the next incoming element
     return queue.isNextIndexValid(nextIndex);
   }
 
-  public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator 
newIterator(long nextIndex) {
+  public synchronized ConcurrentIterableLinkedQueue<E>.DynamicIterator 
newIterator(
+      final long nextIndex) {
     return queue.iterateFrom(nextIndex);
   }
 
   public synchronized void returnIterator(
-      ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
+      final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator) {
     iterator.close();
   }
 
   /////////////////////////////// Snapshot ///////////////////////////////
 
-  public synchronized boolean serializeToFile(File snapshotName) throws 
IOException {
+  public synchronized boolean serializeToFile(final File snapshotName) throws 
IOException {
     final File snapshotFile = new File(String.valueOf(snapshotName));
     if (snapshotFile.exists() && snapshotFile.isFile()) {
       LOGGER.error(
@@ -128,7 +129,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
     }
   }
 
-  public synchronized void deserializeFromFile(File snapshotName) throws 
IOException {
+  public synchronized void deserializeFromFile(final File snapshotName) throws 
IOException {
     final File snapshotFile = new File(String.valueOf(snapshotName));
     if (!snapshotFile.exists() || !snapshotFile.isFile()) {
       LOGGER.error(
@@ -155,7 +156,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
 
   /////////////////////////////// Element Ser / De Method 
////////////////////////////////
 
-  protected abstract ByteBuffer serializeToByteBuffer(E element);
+  protected abstract ByteBuffer serializeToByteBuffer(final E element);
 
   /**
    * Deserialize a single element from byteBuffer.
@@ -163,7 +164,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
    * @param byteBuffer the byteBuffer corresponding to an element
    * @return The deserialized element or {@code null} if a failure is 
encountered.
    */
-  protected abstract E deserializeFromByteBuffer(ByteBuffer byteBuffer);
+  protected abstract E deserializeFromByteBuffer(final ByteBuffer byteBuffer);
 
   /////////////////////////////// Open & Close ///////////////////////////////
 
@@ -172,7 +173,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public synchronized void close() {
     isClosed.set(true);
 
     try (final ConcurrentIterableLinkedQueue<E>.DynamicIterator iterator =
@@ -188,7 +189,7 @@ public abstract class AbstractSerializableListeningQueue<E> 
implements Closeable
     queue.clear();
   }
 
-  protected abstract void releaseResource(E element);
+  protected abstract void releaseResource(final E element);
 
   public synchronized boolean isOpened() {
     return !isClosed.get();

Reply via email to