This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 23d00ab8282 Subscription: poll heartbeat event when there are no 
consumers (#15307) (#15325)
23d00ab8282 is described below

commit 23d00ab82821cfa2f9ef4c95216bd4f9b9a7cf27
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 14 09:52:04 2025 +0800

    Subscription: poll heartbeat event when there are no consumers (#15307) 
(#15325)
---
 .../PipeRealtimePriorityBlockingQueue.java         |  9 +++++++
 .../broker/SubscriptionBlockingPendingQueue.java   |  4 ++++
 .../broker/SubscriptionPrefetchingQueue.java       | 28 +++++++++++++++++++++-
 .../TsFileDeduplicationBlockingPendingQueue.java   | 10 ++++++++
 .../task/connection/BlockingPendingQueue.java      |  4 ++++
 5 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index d9eac5f5625..61391d039af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -161,6 +161,15 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     return event;
   }
 
+  @Override
+  public Event peek() {
+    final Event event = pendingQueue.peek();
+    if (Objects.nonNull(event)) {
+      return event;
+    }
+    return tsfileInsertEventDeque.peek();
+  }
+
   @Override
   public void clear() {
     super.clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
index df3791cc327..b49cbccdfa2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
@@ -33,6 +33,10 @@ public abstract class SubscriptionBlockingPendingQueue {
 
   public abstract Event waitedPoll();
 
+  public abstract Event peek();
+
+  public abstract void directOffer(final Event event);
+
   public int size() {
     return inputPendingQueue.size();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index abd3e1c300c..0f638e20d80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -269,6 +270,7 @@ public abstract class SubscriptionPrefetchingQueue {
             committedCleaner, pollableNacker, responsePrefetcher, 
responseSerializer);
         return true;
       } else {
+        peekOnce();
         remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
         return false;
       }
@@ -311,6 +313,30 @@ public abstract class SubscriptionPrefetchingQueue {
     prefetchingQueue.add(thisEvent);
   }
 
+  private synchronized void peekOnce() {
+    final Event peekedEvent = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.peek());
+    if (Objects.isNull(peekedEvent)) {
+      return;
+    }
+
+    if (!(peekedEvent instanceof PipeHeartbeatEvent)) {
+      return;
+    }
+
+    final Event polledEvent = inputPendingQueue.waitedPoll();
+    if (!Objects.equals(peekedEvent, polledEvent)) {
+      LOGGER.warn(
+          "Subscription: inconsistent heartbeat event when {} peeking (broken 
invariant), expected {}, actual {}, offer back",
+          this,
+          peekedEvent,
+          polledEvent);
+      inputPendingQueue.directOffer(polledEvent);
+    } else {
+      ((PipeHeartbeatEvent) peekedEvent)
+          
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
+    }
+  }
+
   /**
    * Prefetch at most one {@link SubscriptionEvent} from {@link
    * SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
@@ -319,7 +345,7 @@ public abstract class SubscriptionPrefetchingQueue {
    * <p>It will continuously attempt to prefetch and generate a {@link 
SubscriptionEvent} until
    * {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
    */
-  private void tryPrefetch() {
+  private synchronized void tryPrefetch() {
     while (!inputPendingQueue.isEmpty()) {
       final Event event = 
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
       if (Objects.isNull(event)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index bf2583c0791..e4a3a545a58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -58,6 +58,16 @@ public class TsFileDeduplicationBlockingPendingQueue extends 
SubscriptionBlockin
     return filter(inputPendingQueue.waitedPoll());
   }
 
+  @Override
+  public Event peek() {
+    return inputPendingQueue.peek();
+  }
+
+  @Override
+  public void directOffer(final Event event) {
+    inputPendingQueue.directOffer(event);
+  }
+
   private synchronized Event filter(final Event event) { // make it 
synchronized
     if (Objects.isNull(event)) {
       return null;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 82518bda14d..82170661deb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -107,6 +107,10 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
     return event;
   }
 
+  public E peek() {
+    return pendingQueue.peek();
+  }
+
   public void clear() {
     isClosed.set(true);
     pendingQueue.clear();

Reply via email to