This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 95ed457dc08 Subscription: detect outdated subscription event for 
better consumer exception control (#15326)
95ed457dc08 is described below

commit 95ed457dc085401b891b2c6539dde24f66538d61
Author: VGalaxies <[email protected]>
AuthorDate: Thu Apr 17 10:39:21 2025 +0800

    Subscription: detect outdated subscription event for better consumer 
exception control (#15326)
---
 .../subscription/payload/poll/ErrorPayload.java    |  4 +++
 .../base/AbstractSubscriptionConsumer.java         | 13 +++++-----
 .../agent/SubscriptionBrokerAgent.java             |  9 +++++++
 .../db/subscription/broker/SubscriptionBroker.java | 10 ++++++++
 .../broker/SubscriptionPrefetchingQueue.java       | 21 ++++++++++++++++
 .../broker/SubscriptionPrefetchingTabletQueue.java | 10 ++++++++
 .../broker/SubscriptionPrefetchingTsFileQueue.java | 10 ++++++++
 .../receiver/SubscriptionReceiverV1.java           | 29 ++++++++++++++++------
 8 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
index a0a27787fee..4cf4d3d96b4 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
@@ -28,6 +28,10 @@ import java.util.Objects;
 
 public class ErrorPayload implements SubscriptionPollPayload {
 
+  private static final String OUTDATED_ERROR_MSG = "outdated subscription 
event";
+  public static final ErrorPayload OUTDATED_ERROR_PAYLOAD =
+      new ErrorPayload(OUTDATED_ERROR_MSG, false);
+
   /** The error message describing the issue. */
   private transient String errorMessage;
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 37c17583662..0ef5062755b 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -975,13 +975,10 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
       final List<SubscriptionPollResponse> responses =
           pollTabletsInternal(commitContext, nextOffset, timer.remainingMs());
 
-      // It's agreed that the server will always return at least one response, 
even in case of
-      // failure.
+      // If responses is empty, it means that some outdated subscription 
events may be being polled,
+      // so just return.
       if (responses.isEmpty()) {
-        final String errorMessage =
-            String.format("SubscriptionConsumer %s poll empty response", this);
-        LOGGER.warn(errorMessage);
-        throw new SubscriptionRuntimeNonCriticalException(errorMessage);
+        return Optional.empty();
       }
 
       // only one SubscriptionEvent polled currently
@@ -1022,6 +1019,10 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
             final String errorMessage = ((ErrorPayload) 
payload).getErrorMessage();
             final boolean critical = ((ErrorPayload) payload).isCritical();
+            if (Objects.equals(payload, ErrorPayload.OUTDATED_ERROR_PAYLOAD)) {
+              // suppress warn log when poll outdated subscription event
+              return Optional.empty();
+            }
             LOGGER.warn(
                 "Error occurred when SubscriptionConsumer {} polling tablets 
with commit context {}: {}, critical: {}",
                 this,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index f1ee9f8867b..057f31aa66b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -116,6 +116,15 @@ public class SubscriptionBrokerAgent {
     return broker.commit(consumerId, commitContexts, nack);
   }
 
+  public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
+    final String consumerGroupId = commitContext.getConsumerGroupId();
+    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+    if (Objects.isNull(broker)) {
+      return true;
+    }
+    return broker.isCommitContextOutdated(commitContext);
+  }
+
   /////////////////////////////// broker ///////////////////////////////
 
   public boolean isBrokerExist(final String consumerGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 1223672810d..f2ed903f74f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -329,6 +329,16 @@ public class SubscriptionBroker {
     return successfulCommitContexts;
   }
 
+  public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
+    final String topicName = commitContext.getTopicName();
+    final SubscriptionPrefetchingQueue prefetchingQueue =
+        topicNameToPrefetchingQueue.get(topicName);
+    if (Objects.isNull(prefetchingQueue)) {
+      return true;
+    }
+    return prefetchingQueue.isCommitContextOutdated(commitContext);
+  }
+
   /////////////////////////////// prefetching queue 
///////////////////////////////
 
   public void bindPrefetchingQueue(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 0f638e20d80..f0aed45e727 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -68,6 +68,8 @@ public abstract class SubscriptionPrefetchingQueue {
   private final SubscriptionBlockingPendingQueue inputPendingQueue;
 
   private final AtomicLong commitIdGenerator;
+  // record initial commit for outdated event detection
+  private final long initialCommitId;
 
   /** A queue containing a series of prefetched pollable {@link 
SubscriptionEvent}. */
   protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
@@ -113,6 +115,7 @@ public abstract class SubscriptionPrefetchingQueue {
     this.topicName = topicName;
     this.inputPendingQueue = inputPendingQueue;
     this.commitIdGenerator = commitIdGenerator;
+    this.initialCommitId = commitIdGenerator.get();
 
     this.prefetchingQueue = new PriorityBlockingQueue<>();
     this.inFlightEvents = new ConcurrentHashMap<>();
@@ -582,6 +585,24 @@ public abstract class SubscriptionPrefetchingQueue {
             INVALID_COMMIT_ID));
   }
 
+  protected SubscriptionEvent generateSubscriptionPollOutdatedErrorResponse() {
+    // consider non-critical by default, meaning the client can retry
+    return new SubscriptionEvent(
+        SubscriptionPollResponseType.ERROR.getType(),
+        ErrorPayload.OUTDATED_ERROR_PAYLOAD,
+        new SubscriptionCommitContext(
+            IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+            PipeDataNodeAgent.runtime().getRebootTimes(),
+            topicName,
+            brokerId,
+            INVALID_COMMIT_ID));
+  }
+
+  public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
+    return PipeDataNodeAgent.runtime().getRebootTimes() > 
commitContext.getRebootTimes()
+        || initialCommitId > commitContext.getCommitId();
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getPrefetchingQueueId() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 4b7a166b922..6596b26e7d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -78,6 +78,16 @@ public class SubscriptionPrefetchingTabletQueue extends 
SubscriptionPrefetchingQ
         (key, ev) -> {
           // 1. Extract current event and check it
           if (Objects.isNull(ev)) {
+            if (isCommitContextOutdated(commitContext)) {
+              LOGGER.warn(
+                  "SubscriptionPrefetchingTabletQueue {} detected outdated 
poll request, consumer {}, commit context {}, offset {}",
+                  this,
+                  consumerId,
+                  commitContext,
+                  offset);
+              eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
+              return null;
+            }
             final String errorMessage =
                 String.format(
                     "SubscriptionPrefetchingTabletQueue %s is currently not 
transferring any tablet to consumer %s, commit context: %s, offset: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 98156972353..1b29d1d97c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -84,6 +84,16 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
         (key, ev) -> {
           // 1. Extract current event and check it
           if (Objects.isNull(ev)) {
+            if (isCommitContextOutdated(commitContext)) {
+              LOGGER.warn(
+                  "SubscriptionPrefetchingTsFileQueue {} detected outdated 
poll request, consumer {}, commit context {}, writing offset {}",
+                  this,
+                  consumerId,
+                  commitContext,
+                  writingOffset);
+              eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
+              return null;
+            }
             final String errorMessage =
                 String.format(
                     "SubscriptionPrefetchingTsFileQueue %s is currently not 
transferring any file to consumer %s, commit context: %s, writing offset: %s",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 7a7459d0ab2..87fde40a432 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -425,14 +425,20 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
                   final SubscriptionCommitContext commitContext = 
event.getCommitContext();
                   final SubscriptionPollResponse response = 
event.getCurrentResponse();
                   if (Objects.isNull(response)) {
+                    final boolean isOutdated =
+                        SubscriptionAgent.broker()
+                            .isCommitContextOutdated(event.getCommitContext());
                     LOGGER.warn(
-                        "Subscription: consumer {} poll null response for 
event {} with request: {}",
+                        "Subscription: consumer {} poll null response for 
event {} (outdated: {}) with request: {}",
                         consumerConfig,
                         event,
+                        isOutdated,
                         req.getRequest());
                     // nack
-                    SubscriptionAgent.broker()
-                        .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
+                    if (!isOutdated) {
+                      SubscriptionAgent.broker()
+                          .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
+                    }
                     return null;
                   }
 
@@ -462,24 +468,33 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
                         req.getRequest());
                     return byteBuffer;
                   } catch (final Exception e) {
+                    final boolean isOutdated =
+                        SubscriptionAgent.broker()
+                            .isCommitContextOutdated(event.getCommitContext());
                     if (e instanceof SubscriptionPayloadExceedException) {
                       LOGGER.error(
-                          "Subscription: consumer {} poll excessive payload {} 
with request: {}, something unexpected happened with parameter configuration or 
payload control...",
+                          "Subscription: consumer {} poll excessive payload {} 
for event {} (outdated: {}) with request: {}, something unexpected happened 
with parameter configuration or payload control...",
                           consumerConfig,
                           response,
+                          event,
+                          isOutdated,
                           req.getRequest(),
                           e);
                     } else {
                       LOGGER.warn(
-                          "Subscription: consumer {} poll {} failed with 
request: {}",
+                          "Subscription: consumer {} poll {} for event {} 
(outdated: {}) failed with request: {}",
                           consumerConfig,
                           response,
+                          event,
+                          isOutdated,
                           req.getRequest(),
                           e);
                     }
                     // nack
-                    SubscriptionAgent.broker()
-                        .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
+                    if (!isOutdated) {
+                      SubscriptionAgent.broker()
+                          .commit(consumerConfig, 
Collections.singletonList(commitContext), true);
+                    }
                     return null;
                   }
                 })

Reply via email to