This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 23d00ab8282 Subscription: poll heartbeat event when there are no
consumers (#15307) (#15325)
23d00ab8282 is described below
commit 23d00ab82821cfa2f9ef4c95216bd4f9b9a7cf27
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 14 09:52:04 2025 +0800
Subscription: poll heartbeat event when there are no consumers (#15307)
(#15325)
---
.../PipeRealtimePriorityBlockingQueue.java | 9 +++++++
.../broker/SubscriptionBlockingPendingQueue.java | 4 ++++
.../broker/SubscriptionPrefetchingQueue.java | 28 +++++++++++++++++++++-
.../TsFileDeduplicationBlockingPendingQueue.java | 10 ++++++++
.../task/connection/BlockingPendingQueue.java | 4 ++++
5 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index d9eac5f5625..61391d039af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -161,6 +161,15 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
return event;
}
+ @Override
+ public Event peek() {
+ final Event event = pendingQueue.peek();
+ if (Objects.nonNull(event)) {
+ return event;
+ }
+ return tsfileInsertEventDeque.peek();
+ }
+
@Override
public void clear() {
super.clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
index df3791cc327..b49cbccdfa2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBlockingPendingQueue.java
@@ -33,6 +33,10 @@ public abstract class SubscriptionBlockingPendingQueue {
public abstract Event waitedPoll();
+ public abstract Event peek();
+
+ public abstract void directOffer(final Event event);
+
public int size() {
return inputPendingQueue.size();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index abd3e1c300c..0f638e20d80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.agent.task.execution.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -269,6 +270,7 @@ public abstract class SubscriptionPrefetchingQueue {
committedCleaner, pollableNacker, responsePrefetcher,
responseSerializer);
return true;
} else {
+ peekOnce();
remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
return false;
}
@@ -311,6 +313,30 @@ public abstract class SubscriptionPrefetchingQueue {
prefetchingQueue.add(thisEvent);
}
+ private synchronized void peekOnce() {
+ final Event peekedEvent =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.peek());
+ if (Objects.isNull(peekedEvent)) {
+ return;
+ }
+
+ if (!(peekedEvent instanceof PipeHeartbeatEvent)) {
+ return;
+ }
+
+ final Event polledEvent = inputPendingQueue.waitedPoll();
+ if (!Objects.equals(peekedEvent, polledEvent)) {
+ LOGGER.warn(
+ "Subscription: inconsistent heartbeat event when {} peeking (broken
invariant), expected {}, actual {}, offer back",
+ this,
+ peekedEvent,
+ polledEvent);
+ inputPendingQueue.directOffer(polledEvent);
+ } else {
+ ((PipeHeartbeatEvent) peekedEvent)
+
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
+ }
+ }
+
/**
* Prefetch at most one {@link SubscriptionEvent} from {@link
* SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
@@ -319,7 +345,7 @@ public abstract class SubscriptionPrefetchingQueue {
* <p>It will continuously attempt to prefetch and generate a {@link
SubscriptionEvent} until
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
*/
- private void tryPrefetch() {
+ private synchronized void tryPrefetch() {
while (!inputPendingQueue.isEmpty()) {
final Event event =
UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
if (Objects.isNull(event)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index bf2583c0791..e4a3a545a58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -58,6 +58,16 @@ public class TsFileDeduplicationBlockingPendingQueue extends
SubscriptionBlockin
return filter(inputPendingQueue.waitedPoll());
}
+ @Override
+ public Event peek() {
+ return inputPendingQueue.peek();
+ }
+
+ @Override
+ public void directOffer(final Event event) {
+ inputPendingQueue.directOffer(event);
+ }
+
private synchronized Event filter(final Event event) { // make it
synchronized
if (Objects.isNull(event)) {
return null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index 82518bda14d..82170661deb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -107,6 +107,10 @@ public abstract class BlockingPendingQueue<E extends
Event> {
return event;
}
+ public E peek() {
+ return pendingQueue.peek();
+ }
+
public void clear() {
isClosed.set(true);
pendingQueue.clear();