This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new eacd93b0675 feat(subscription): support consumer timeout and idle
disconnect (#17300)
eacd93b0675 is described below
commit eacd93b0675e07cfb82ada79af3db9fef497c3c6
Author: VGalaxies <[email protected]>
AuthorDate: Thu Mar 19 15:38:11 2026 +0800
feat(subscription): support consumer timeout and idle disconnect (#17300)
---
.../rpc/subscription/config/ConsumerConfig.java | 12 ++
.../rpc/subscription/config/ConsumerConstant.java | 3 +
.../consumer/SubscriptionConsumer.java | 18 ++-
.../consumer/SubscriptionProvider.java | 15 ++-
.../consumer/SubscriptionPullConsumer.java | 6 +
.../consumer/SubscriptionPushConsumer.java | 6 +
.../agent/SubscriptionReceiverAgent.java | 26 +++-
.../receiver/SubscriptionReceiver.java | 2 +
.../receiver/SubscriptionReceiverV1.java | 144 ++++++++++++++++-----
.../receiver/SubscriptionReceiverV1Test.java | 114 ++++++++++++++++
10 files changed, 312 insertions(+), 34 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 3bcb984732c..5094ae7eea4 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,18 @@ public class ConsumerConfig extends PipeParameters {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}
+ public long getHeartbeatIntervalMs() {
+ return getLongOrDefault(
+ ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
+ ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE);
+ }
+
+ public int getConnectionTimeoutInMs() {
+ return getIntOrDefault(
+ ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+ ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE);
+ }
+
public void setConsumerId(final String consumerId) {
attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 5ad1d9ba435..504893b80ed 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -52,6 +52,9 @@ public class ConsumerConstant {
public static final String THRIFT_MAX_FRAME_SIZE_KEY =
"thrift-max-frame-size";
+ public static final String CONNECTION_TIMEOUT_MS_KEY =
"connection-timeout-ms";
+ public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0;
+
public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism";
public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index feeb4543eb7..df12472f78f 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -118,6 +118,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet =
new HashSet<>();
private final int thriftMaxFrameSize;
+ private final int connectionTimeoutInMs;
private final int maxPollParallelism;
@SuppressWarnings("java:S3077")
@@ -187,6 +188,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
this.fileSaveFsync = builder.fileSaveFsync;
this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+ this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
this.maxPollParallelism = builder.maxPollParallelism;
}
@@ -231,6 +233,11 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
properties.getOrDefault(
ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE))
+ .connectionTimeoutInMs(
+ (Integer)
+ properties.getOrDefault(
+ ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+ SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS))
.maxPollParallelism(
(Integer)
properties.getOrDefault(
@@ -381,7 +388,9 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
this.password,
this.consumerId,
this.consumerGroupId,
- this.thriftMaxFrameSize);
+ this.thriftMaxFrameSize,
+ this.heartbeatIntervalMs,
+ this.connectionTimeoutInMs);
try {
provider.handshake();
} catch (final Exception e) {
@@ -1397,6 +1406,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
protected boolean fileSaveFsync =
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+ protected int connectionTimeoutInMs =
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
protected int maxPollParallelism =
ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;
public Builder host(final String host) {
@@ -1467,6 +1477,11 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
+ public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+ this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
+ return this;
+ }
+
public Builder maxPollParallelism(final int maxPollParallelism) {
// Here the minimum value of max poll parallelism is set to 1 instead of
0, in order to use a
// single thread to execute poll whenever there are idle resources
available, thereby
@@ -1509,6 +1524,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
result.put("fileSaveFsync", String.valueOf(fileSaveFsync));
result.put("inFlightFilesCommitContextSet",
inFlightFilesCommitContextSet.toString());
result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize));
+ result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs));
result.put("maxPollParallelism", String.valueOf(maxPollParallelism));
result.put("subscribedTopics", subscribedTopics.toString());
return result;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 03eefeaddba..9f153efeb10 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -83,6 +83,8 @@ final class SubscriptionProvider extends SubscriptionSession {
private final AtomicBoolean isAvailable = new AtomicBoolean(false);
private final TEndPoint endPoint;
+ private final long heartbeatIntervalMs;
+ private final int connectionTimeoutInMs;
private int dataNodeId;
SubscriptionProvider(
@@ -91,12 +93,16 @@ final class SubscriptionProvider extends
SubscriptionSession {
final String password,
final String consumerId,
final String consumerGroupId,
- final int thriftMaxFrameSize) {
+ final int thriftMaxFrameSize,
+ final long heartbeatIntervalMs,
+ final int connectionTimeoutInMs) {
super(endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize);
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
}
boolean isAvailable() {
@@ -138,12 +144,15 @@ final class SubscriptionProvider extends
SubscriptionSession {
return;
}
- super.open(); // throw IoTDBConnectionException
+ super.open(false, connectionTimeoutInMs); // throw IoTDBConnectionException
- // TODO: pass the complete consumer parameter configuration to the server
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY,
consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+ consumerAttributes.put(
+ ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
String.valueOf(heartbeatIntervalMs));
+ consumerAttributes.put(
+ ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
String.valueOf(connectionTimeoutInMs));
final PipeSubscribeHandshakeResp resp =
handshake(new ConsumerConfig(consumerAttributes)); // throw
SubscriptionException
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index cc56df89782..c1dd131490e 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -366,6 +366,12 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return this;
}
+ @Override
+ public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+ super.connectionTimeoutInMs(connectionTimeoutInMs);
+ return this;
+ }
+
@Override
public Builder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index d3c34b7ecaf..2a413278090 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -299,6 +299,12 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
return this;
}
+ @Override
+ public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+ super.connectionTimeoutInMs(connectionTimeoutInMs);
+ return this;
+ }
+
@Override
public Builder maxPollParallelism(final int maxPollParallelism) {
super.maxPollParallelism(maxPollParallelism);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 2e2f27b0a02..200728fe203 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.subscription.agent;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver;
import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1;
@@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class SubscriptionReceiverAgent {
@@ -54,10 +60,20 @@ public class SubscriptionReceiverAgent {
PipeSubscribeResponseType.ACK.getType());
private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new
ThreadLocal<>();
+ private final Set<SubscriptionReceiver> activeReceivers =
ConcurrentHashMap.newKeySet();
+ private final ScheduledExecutorService receiverTimeoutChecker =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ SubscriptionReceiverAgent.class.getSimpleName() +
"-Timeout-Checker");
SubscriptionReceiverAgent() {
RECEIVER_CONSTRUCTORS.put(
PipeSubscribeRequestVersion.VERSION_1.getVersion(),
SubscriptionReceiverV1::new);
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ receiverTimeoutChecker,
+ this::checkReceiverTimeouts,
+ Math.max(1_000L,
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+ Math.max(1_000L,
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+ TimeUnit.MILLISECONDS);
}
public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
@@ -67,7 +83,10 @@ public class SubscriptionReceiverAgent {
final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
- return getReceiver(reqVersion).handle(req);
+ final SubscriptionReceiver receiver = getReceiver(reqVersion);
+ activeReceivers.add(receiver);
+ receiver.handleTimeout();
+ return receiver.handle(req);
} else {
final TSStatus status =
RpcUtils.getStatus(
@@ -126,8 +145,13 @@ public class SubscriptionReceiverAgent {
public final void handleClientExit() {
final SubscriptionReceiver receiver = receiverThreadLocal.get();
if (receiver != null) {
+ activeReceivers.remove(receiver);
receiver.handleExit();
receiverThreadLocal.remove();
}
}
+
+ private void checkReceiverTimeouts() {
+ activeReceivers.forEach(SubscriptionReceiver::handleTimeout);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
index c636e1b2dc6..36e3c9b74f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
@@ -31,5 +31,7 @@ public interface SubscriptionReceiver {
void handleExit();
+ void handleTimeout();
+
long remainingMs();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 4c8033f1612..8cb09983689 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -98,6 +98,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9;
+ private static final long HEARTBEAT_TIMEOUT_MULTIPLIER = 3L;
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
@@ -112,41 +113,53 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new
ThreadLocal<>();
private final ThreadLocal<PollTimer> pollTimerThreadLocal = new
ThreadLocal<>();
+ private volatile ConsumerConfig sharedConsumerConfig;
+ private volatile boolean consumerInvalidated;
+ private volatile long lastActivityTimeMs = System.currentTimeMillis();
+ private final AtomicLong inFlightRequestCount = new AtomicLong(0);
@Override
public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
final short reqType = req.getType();
- if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
- switch (PipeSubscribeRequestType.valueOf(reqType)) {
- case HANDSHAKE:
- return
handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
- case HEARTBEAT:
- return
handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
- case SUBSCRIBE:
- return
handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
- case UNSUBSCRIBE:
- return handlePipeSubscribeUnsubscribe(
- PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
- case POLL:
- return
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
- case COMMIT:
- return
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
- case CLOSE:
- return
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
- default:
- break;
+ beforeHandle(reqType);
+ try {
+ if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
+ switch (PipeSubscribeRequestType.valueOf(reqType)) {
+ case HANDSHAKE:
+ return handlePipeSubscribeHandshake(
+ PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
+ case HEARTBEAT:
+ return handlePipeSubscribeHeartbeat(
+ PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
+ case SUBSCRIBE:
+ return handlePipeSubscribeSubscribe(
+ PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
+ case UNSUBSCRIBE:
+ return handlePipeSubscribeUnsubscribe(
+ PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
+ case POLL:
+ return
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
+ case COMMIT:
+ return
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
+ case CLOSE:
+ return
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
+ default:
+ break;
+ }
}
- }
- final TSStatus status =
- RpcUtils.getStatus(
- TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
- String.format("Unknown PipeSubscribeRequestType %s.", reqType));
- LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response
status = {}.", status);
- return new TPipeSubscribeResp(
- status,
- PipeSubscribeResponseVersion.VERSION_1.getVersion(),
- PipeSubscribeResponseType.ACK.getType());
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
+ String.format("Unknown PipeSubscribeRequestType %s.", reqType));
+ LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response
status = {}.", status);
+ return new TPipeSubscribeResp(
+ status,
+ PipeSubscribeResponseVersion.VERSION_1.getVersion(),
+ PipeSubscribeResponseType.ACK.getType());
+ } finally {
+ inFlightRequestCount.decrementAndGet();
+ }
}
@Override
@@ -169,6 +182,41 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
unsubscribeCompleteTopics(consumerConfig);
consumerConfigThreadLocal.remove();
}
+ clearSharedConsumerState();
+ }
+
+ @Override
+ public void handleTimeout() {
+ final ConsumerConfig consumerConfig;
+ final long inactiveMs;
+ final long timeoutMs;
+ synchronized (this) {
+ consumerConfig = sharedConsumerConfig;
+ if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) {
+ return;
+ }
+ timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig);
+ inactiveMs = System.currentTimeMillis() - lastActivityTimeMs;
+ if (inactiveMs <= timeoutMs) {
+ return;
+ }
+ clearSharedConsumerState();
+ }
+
+ LOGGER.info(
+ "Subscription: consumer {} is inactive for {} ms, exceeding timeout {}
ms, close it on server side.",
+ consumerConfig,
+ inactiveMs,
+ timeoutMs);
+ try {
+ closeConsumer(consumerConfig);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Subscription: failed to close timed out consumer {} after {} ms
inactivity",
+ consumerConfig,
+ inactiveMs,
+ e);
+ }
}
@Override
@@ -239,6 +287,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
consumerConfig);
}
+ activateConsumer(consumerConfig);
+
final int dataNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
LOGGER.info(
"Subscription: consumer {} handshake successfully, data node id: {}",
@@ -657,6 +707,9 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
closeConsumer(consumerConfig);
+ consumerConfigThreadLocal.remove();
+ pollTimerThreadLocal.remove();
+ clearSharedConsumerState();
return
PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
}
@@ -860,4 +913,37 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
throw new SubscriptionException(exceptionMessage);
}
}
+
+ private void beforeHandle(final short reqType) {
+ synchronized (this) {
+ if (consumerInvalidated) {
+ consumerConfigThreadLocal.remove();
+ pollTimerThreadLocal.remove();
+ if (PipeSubscribeRequestType.HANDSHAKE.getType() == reqType) {
+ consumerInvalidated = false;
+ }
+ }
+ inFlightRequestCount.incrementAndGet();
+ lastActivityTimeMs = System.currentTimeMillis();
+ }
+ }
+
+ private void activateConsumer(final ConsumerConfig consumerConfig) {
+ synchronized (this) {
+ sharedConsumerConfig = consumerConfig;
+ consumerInvalidated = false;
+ lastActivityTimeMs = System.currentTimeMillis();
+ }
+ }
+
+ private void clearSharedConsumerState() {
+ sharedConsumerConfig = null;
+ consumerInvalidated = true;
+ }
+
+ private long calculateConsumerInactivityTimeoutMs(final ConsumerConfig
consumerConfig) {
+ return Math.max(
+ SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(),
+ consumerConfig.getHeartbeatIntervalMs() *
HEARTBEAT_TIMEOUT_MULTIPLIER);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
new file mode 100644
index 00000000000..ba0070187e3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.receiver;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SubscriptionReceiverV1Test {
+
+ @Test
+ public void testHandleTimeoutKeepsRecentlyActiveConsumer() throws Exception {
+ final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+ final ConsumerConfig consumerConfig = createConsumerConfig(1_000L);
+
+ setField(receiver, "sharedConsumerConfig", consumerConfig);
+ setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() -
1_000L);
+
+ receiver.handleTimeout();
+
+ Assert.assertSame(consumerConfig, getField(receiver,
"sharedConsumerConfig"));
+ Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated"));
+ }
+
+ @Test
+ public void testHandleTimeoutSkipsConsumerWithInFlightRequests() throws
Exception {
+ final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+ final ConsumerConfig consumerConfig = createConsumerConfig(1_000L);
+
+ setField(receiver, "sharedConsumerConfig", consumerConfig);
+ setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() -
15_000L);
+ ((AtomicLong) getField(receiver, "inFlightRequestCount")).set(1L);
+
+ receiver.handleTimeout();
+
+ Assert.assertSame(consumerConfig, getField(receiver,
"sharedConsumerConfig"));
+ Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated"));
+ }
+
+ @Test
+ public void testCalculateConsumerInactivityTimeoutUsesDefaultTimeout()
throws Exception {
+ final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+
+ Assert.assertEquals(
+ SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(),
+ invokeCalculateConsumerInactivityTimeoutMs(receiver,
createConsumerConfig(1_000L)));
+ }
+
+ @Test
+ public void testCalculateConsumerInactivityTimeoutUsesHeartbeatMultiple()
throws Exception {
+ final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+
+ Assert.assertEquals(
+ 15_000L,
+ invokeCalculateConsumerInactivityTimeoutMs(receiver,
createConsumerConfig(5_000L)));
+ }
+
+ private long invokeCalculateConsumerInactivityTimeoutMs(
+ final SubscriptionReceiverV1 receiver, final ConsumerConfig
consumerConfig) throws Exception {
+ final Method method =
+ SubscriptionReceiverV1.class.getDeclaredMethod(
+ "calculateConsumerInactivityTimeoutMs", ConsumerConfig.class);
+ method.setAccessible(true);
+ return (long) method.invoke(receiver, consumerConfig);
+ }
+
+ private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" +
UUID.randomUUID());
+ attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" +
UUID.randomUUID());
+ attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
String.valueOf(heartbeatIntervalMs));
+ return new ConsumerConfig(attributes);
+ }
+
+ private Object getField(final Object target, final String fieldName) throws
Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(target);
+ }
+
+ private void setField(final Object target, final String fieldName, final
Object value)
+ throws Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}