This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 95ed457dc08 Subscription: detect outdated subscription event for
better consumer exception control (#15326)
95ed457dc08 is described below
commit 95ed457dc085401b891b2c6539dde24f66538d61
Author: VGalaxies <[email protected]>
AuthorDate: Thu Apr 17 10:39:21 2025 +0800
Subscription: detect outdated subscription event for better consumer
exception control (#15326)
---
.../subscription/payload/poll/ErrorPayload.java | 4 +++
.../base/AbstractSubscriptionConsumer.java | 13 +++++-----
.../agent/SubscriptionBrokerAgent.java | 9 +++++++
.../db/subscription/broker/SubscriptionBroker.java | 10 ++++++++
.../broker/SubscriptionPrefetchingQueue.java | 21 ++++++++++++++++
.../broker/SubscriptionPrefetchingTabletQueue.java | 10 ++++++++
.../broker/SubscriptionPrefetchingTsFileQueue.java | 10 ++++++++
.../receiver/SubscriptionReceiverV1.java | 29 ++++++++++++++++------
8 files changed, 93 insertions(+), 13 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
index a0a27787fee..4cf4d3d96b4 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/ErrorPayload.java
@@ -28,6 +28,10 @@ import java.util.Objects;
public class ErrorPayload implements SubscriptionPollPayload {
+ private static final String OUTDATED_ERROR_MSG = "outdated subscription
event";
+ public static final ErrorPayload OUTDATED_ERROR_PAYLOAD =
+ new ErrorPayload(OUTDATED_ERROR_MSG, false);
+
/** The error message describing the issue. */
private transient String errorMessage;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 37c17583662..0ef5062755b 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -975,13 +975,10 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
final List<SubscriptionPollResponse> responses =
pollTabletsInternal(commitContext, nextOffset, timer.remainingMs());
- // It's agreed that the server will always return at least one response,
even in case of
- // failure.
+ // If responses is empty, it means that some outdated subscription
events may be being polled,
+ // so just return.
if (responses.isEmpty()) {
- final String errorMessage =
- String.format("SubscriptionConsumer %s poll empty response", this);
- LOGGER.warn(errorMessage);
- throw new SubscriptionRuntimeNonCriticalException(errorMessage);
+ return Optional.empty();
}
// only one SubscriptionEvent polled currently
@@ -1022,6 +1019,10 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
final String errorMessage = ((ErrorPayload)
payload).getErrorMessage();
final boolean critical = ((ErrorPayload) payload).isCritical();
+ if (Objects.equals(payload, ErrorPayload.OUTDATED_ERROR_PAYLOAD)) {
+ // suppress warn log when poll outdated subscription event
+ return Optional.empty();
+ }
LOGGER.warn(
"Error occurred when SubscriptionConsumer {} polling tablets
with commit context {}: {}, critical: {}",
this,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index f1ee9f8867b..057f31aa66b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -116,6 +116,15 @@ public class SubscriptionBrokerAgent {
return broker.commit(consumerId, commitContexts, nack);
}
+ public boolean isCommitContextOutdated(final SubscriptionCommitContext
commitContext) {
+ final String consumerGroupId = commitContext.getConsumerGroupId();
+ final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ if (Objects.isNull(broker)) {
+ return true;
+ }
+ return broker.isCommitContextOutdated(commitContext);
+ }
+
/////////////////////////////// broker ///////////////////////////////
public boolean isBrokerExist(final String consumerGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 1223672810d..f2ed903f74f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -329,6 +329,16 @@ public class SubscriptionBroker {
return successfulCommitContexts;
}
+ public boolean isCommitContextOutdated(final SubscriptionCommitContext
commitContext) {
+ final String topicName = commitContext.getTopicName();
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ if (Objects.isNull(prefetchingQueue)) {
+ return true;
+ }
+ return prefetchingQueue.isCommitContextOutdated(commitContext);
+ }
+
/////////////////////////////// prefetching queue
///////////////////////////////
public void bindPrefetchingQueue(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 0f638e20d80..f0aed45e727 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -68,6 +68,8 @@ public abstract class SubscriptionPrefetchingQueue {
private final SubscriptionBlockingPendingQueue inputPendingQueue;
private final AtomicLong commitIdGenerator;
+ // record initial commit for outdated event detection
+ private final long initialCommitId;
/** A queue containing a series of prefetched pollable {@link
SubscriptionEvent}. */
protected final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
@@ -113,6 +115,7 @@ public abstract class SubscriptionPrefetchingQueue {
this.topicName = topicName;
this.inputPendingQueue = inputPendingQueue;
this.commitIdGenerator = commitIdGenerator;
+ this.initialCommitId = commitIdGenerator.get();
this.prefetchingQueue = new PriorityBlockingQueue<>();
this.inFlightEvents = new ConcurrentHashMap<>();
@@ -582,6 +585,24 @@ public abstract class SubscriptionPrefetchingQueue {
INVALID_COMMIT_ID));
}
+ protected SubscriptionEvent generateSubscriptionPollOutdatedErrorResponse() {
+ // consider non-critical by default, meaning the client can retry
+ return new SubscriptionEvent(
+ SubscriptionPollResponseType.ERROR.getType(),
+ ErrorPayload.OUTDATED_ERROR_PAYLOAD,
+ new SubscriptionCommitContext(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ PipeDataNodeAgent.runtime().getRebootTimes(),
+ topicName,
+ brokerId,
+ INVALID_COMMIT_ID));
+ }
+
+ public boolean isCommitContextOutdated(final SubscriptionCommitContext
commitContext) {
+ return PipeDataNodeAgent.runtime().getRebootTimes() >
commitContext.getRebootTimes()
+ || initialCommitId > commitContext.getCommitId();
+ }
+
//////////////////////////// APIs provided for metric framework
////////////////////////////
public String getPrefetchingQueueId() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 4b7a166b922..6596b26e7d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -78,6 +78,16 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
(key, ev) -> {
// 1. Extract current event and check it
if (Objects.isNull(ev)) {
+ if (isCommitContextOutdated(commitContext)) {
+ LOGGER.warn(
+ "SubscriptionPrefetchingTabletQueue {} detected outdated
poll request, consumer {}, commit context {}, offset {}",
+ this,
+ consumerId,
+ commitContext,
+ offset);
+ eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
+ return null;
+ }
final String errorMessage =
String.format(
"SubscriptionPrefetchingTabletQueue %s is currently not
transferring any tablet to consumer %s, commit context: %s, offset: %s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 98156972353..1b29d1d97c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -84,6 +84,16 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
(key, ev) -> {
// 1. Extract current event and check it
if (Objects.isNull(ev)) {
+ if (isCommitContextOutdated(commitContext)) {
+ LOGGER.warn(
+ "SubscriptionPrefetchingTsFileQueue {} detected outdated
poll request, consumer {}, commit context {}, writing offset {}",
+ this,
+ consumerId,
+ commitContext,
+ writingOffset);
+ eventRef.set(generateSubscriptionPollOutdatedErrorResponse());
+ return null;
+ }
final String errorMessage =
String.format(
"SubscriptionPrefetchingTsFileQueue %s is currently not
transferring any file to consumer %s, commit context: %s, writing offset: %s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 7a7459d0ab2..87fde40a432 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -425,14 +425,20 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
final SubscriptionCommitContext commitContext =
event.getCommitContext();
final SubscriptionPollResponse response =
event.getCurrentResponse();
if (Objects.isNull(response)) {
+ final boolean isOutdated =
+ SubscriptionAgent.broker()
+ .isCommitContextOutdated(event.getCommitContext());
LOGGER.warn(
- "Subscription: consumer {} poll null response for
event {} with request: {}",
+ "Subscription: consumer {} poll null response for
event {} (outdated: {}) with request: {}",
consumerConfig,
event,
+ isOutdated,
req.getRequest());
// nack
- SubscriptionAgent.broker()
- .commit(consumerConfig,
Collections.singletonList(commitContext), true);
+ if (!isOutdated) {
+ SubscriptionAgent.broker()
+ .commit(consumerConfig,
Collections.singletonList(commitContext), true);
+ }
return null;
}
@@ -462,24 +468,33 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
req.getRequest());
return byteBuffer;
} catch (final Exception e) {
+ final boolean isOutdated =
+ SubscriptionAgent.broker()
+ .isCommitContextOutdated(event.getCommitContext());
if (e instanceof SubscriptionPayloadExceedException) {
LOGGER.error(
- "Subscription: consumer {} poll excessive payload {}
with request: {}, something unexpected happened with parameter configuration or
payload control...",
+ "Subscription: consumer {} poll excessive payload {}
for event {} (outdated: {}) with request: {}, something unexpected happened
with parameter configuration or payload control...",
consumerConfig,
response,
+ event,
+ isOutdated,
req.getRequest(),
e);
} else {
LOGGER.warn(
- "Subscription: consumer {} poll {} failed with
request: {}",
+ "Subscription: consumer {} poll {} for event {}
(outdated: {}) failed with request: {}",
consumerConfig,
response,
+ event,
+ isOutdated,
req.getRequest(),
e);
}
// nack
- SubscriptionAgent.broker()
- .commit(consumerConfig,
Collections.singletonList(commitContext), true);
+ if (!isOutdated) {
+ SubscriptionAgent.broker()
+ .commit(consumerConfig,
Collections.singletonList(commitContext), true);
+ }
return null;
}
})