This is an automated email from the ASF dual-hosted git repository.

vgalaxies pushed a commit to branch codex/subscription-consumer-timeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 05ce96fb862e4b5222fb7db7560ecf1248a29c84
Author: VGalaxies <[email protected]>
AuthorDate: Thu Mar 12 22:37:50 2026 +0800

    feat(subscription): support consumer timeout and idle disconnect
    
    Expose connectionTimeoutInMs across subscription consumer builders and 
handshake/session setup.
    
    Close idle server-side consumers based on heartbeat inactivity and reuse 
closeConsumer to unsubscribe topics before dropping the consumer.
---
 .../rpc/subscription/config/ConsumerConfig.java    |  12 ++
 .../rpc/subscription/config/ConsumerConstant.java  |   3 +
 .../iotdb/session/AbstractSessionBuilder.java      |   1 +
 .../java/org/apache/iotdb/session/Session.java     |   1 +
 .../subscription/SubscriptionSessionWrapper.java   |   2 +-
 .../SubscriptionTableSessionBuilder.java           |   5 +
 .../SubscriptionTreeSessionBuilder.java            |   5 +
 .../base/AbstractSubscriptionConsumer.java         |  16 ++-
 .../base/AbstractSubscriptionConsumerBuilder.java  |   7 +
 .../base/AbstractSubscriptionProvider.java         |  22 +++-
 .../AbstractSubscriptionPullConsumerBuilder.java   |   7 +
 .../AbstractSubscriptionPushConsumerBuilder.java   |   7 +
 .../consumer/table/SubscriptionTableProvider.java  |  20 ++-
 .../table/SubscriptionTablePullConsumer.java       |  13 +-
 .../SubscriptionTablePullConsumerBuilder.java      |   7 +
 .../table/SubscriptionTablePushConsumer.java       |  13 +-
 .../SubscriptionTablePushConsumerBuilder.java      |   7 +
 .../consumer/tree/SubscriptionTreeProvider.java    |  20 ++-
 .../tree/SubscriptionTreePullConsumer.java         |  20 ++-
 .../tree/SubscriptionTreePullConsumerBuilder.java  |   7 +
 .../tree/SubscriptionTreePushConsumer.java         |  20 ++-
 .../tree/SubscriptionTreePushConsumerBuilder.java  |   7 +
 .../agent/SubscriptionReceiverAgent.java           |  26 +++-
 .../receiver/SubscriptionReceiver.java             |   2 +
 .../receiver/SubscriptionReceiverV1.java           | 144 ++++++++++++++++-----
 25 files changed, 342 insertions(+), 52 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 100de1f1126..0b5c5a55477 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -80,6 +80,18 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.SQL_DIALECT_KEY);
   }
 
+  public long getHeartbeatIntervalMs() {
+    return getLongOrDefault(
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE);
+  }
+
+  public int getConnectionTimeoutInMs() {
+    return getIntOrDefault(
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE);
+  }
+
   public void setConsumerId(final String consumerId) {
     attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index dd0c583e40d..a0e6f9ed228 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -55,6 +55,9 @@ public class ConsumerConstant {
 
   public static final String THRIFT_MAX_FRAME_SIZE_KEY = 
"thrift-max-frame-size";
 
+  public static final String CONNECTION_TIMEOUT_MS_KEY = 
"connection-timeout-ms";
+  public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0;
+
   public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism";
   public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1;
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java
index a8f18ce32bc..f6132d8041f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java
@@ -41,6 +41,7 @@ public abstract class AbstractSessionBuilder {
   public ZoneId zoneId = null;
   public int thriftDefaultBufferSize = 
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
   public int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+  public int connectionTimeoutInMs = 
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
   // this field only take effect in write request, nothing to do with any 
other type requests,
   // like query, load and so on.
   // if set to true, it means that we may redirect the write request to its 
corresponding leader
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 572e6792e7c..63825299407 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -470,6 +470,7 @@ public class Session implements ISession {
     this.zoneId = builder.zoneId;
     this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize;
     this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+    this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
     this.version = builder.version;
     this.useSSL = builder.useSSL;
     this.trustStore = builder.trustStore;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
index ec5a4d6cc99..58c0b4a957f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
@@ -36,7 +36,7 @@ public final class SubscriptionSessionWrapper extends Session 
{
   }
 
   public void open() throws IoTDBConnectionException {
-    super.open();
+    super.open(enableThriftRpcCompaction, connectionTimeoutInMs);
   }
 
   public void close() throws IoTDBConnectionException {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java
index 41e26161c04..c1efd16557d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java
@@ -58,6 +58,11 @@ public class SubscriptionTableSessionBuilder extends 
AbstractSessionBuilder {
     return this;
   }
 
+  public SubscriptionTableSessionBuilder connectionTimeoutInMs(final int 
connectionTimeoutInMs) {
+    super.connectionTimeoutInMs = connectionTimeoutInMs;
+    return this;
+  }
+
   public ISubscriptionTableSession build() throws IoTDBConnectionException {
     final ISubscriptionTableSession session = new 
SubscriptionTableSession(this);
     session.open();
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java
index dc21732ba97..8a1fa67d822 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java
@@ -57,6 +57,11 @@ public class SubscriptionTreeSessionBuilder extends 
AbstractSessionBuilder {
     return this;
   }
 
+  public SubscriptionTreeSessionBuilder connectionTimeoutInMs(final int 
connectionTimeoutInMs) {
+    super.connectionTimeoutInMs = connectionTimeoutInMs;
+    return this;
+  }
+
   public ISubscriptionTreeSession build() {
     return new SubscriptionTreeSession(this);
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index a12340e9d76..e27f182549e 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -118,6 +118,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
   private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet = 
new HashSet<>();
 
   private final int thriftMaxFrameSize;
+  private final int connectionTimeoutInMs;
   private final int maxPollParallelism;
 
   @SuppressWarnings("java:S3077")
@@ -187,6 +188,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     this.fileSaveFsync = builder.fileSaveFsync;
 
     this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+    this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
     this.maxPollParallelism = builder.maxPollParallelism;
   }
 
@@ -232,6 +234,11 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
                     properties.getOrDefault(
                         ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY,
                         SessionConfig.DEFAULT_MAX_FRAME_SIZE))
+            .connectionTimeoutInMs(
+                (Integer)
+                    properties.getOrDefault(
+                        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+                        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS))
             .maxPollParallelism(
                 (Integer)
                     properties.getOrDefault(
@@ -382,7 +389,9 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize);
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs);
 
   AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint 
endPoint)
       throws SubscriptionException {
@@ -393,7 +402,9 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
             this.password,
             this.consumerId,
             this.consumerGroupId,
-            this.thriftMaxFrameSize);
+            this.thriftMaxFrameSize,
+            this.heartbeatIntervalMs,
+            this.connectionTimeoutInMs);
     try {
       provider.handshake();
     } catch (final Exception e) {
@@ -1428,6 +1439,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     result.put("fileSaveFsync", String.valueOf(fileSaveFsync));
     result.put("inFlightFilesCommitContextSet", 
inFlightFilesCommitContextSet.toString());
     result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize));
+    result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs));
     result.put("maxPollParallelism", String.valueOf(maxPollParallelism));
     result.put("subscribedTopics", subscribedTopics.toString());
     return result;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
index 7f965069e73..81bfb6241c9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
@@ -48,6 +48,7 @@ public class AbstractSubscriptionConsumerBuilder {
   protected boolean fileSaveFsync = 
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
 
   protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+  protected int connectionTimeoutInMs = 
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
   protected int maxPollParallelism = 
ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;
 
   public AbstractSubscriptionConsumerBuilder host(final String host) {
@@ -120,6 +121,12 @@ public class AbstractSubscriptionConsumerBuilder {
     return this;
   }
 
+  public AbstractSubscriptionConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
+    return this;
+  }
+
   public AbstractSubscriptionConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     // Here the minimum value of max poll parallelism is set to 1 instead of 
0, in order to use a
     // single thread to execute poll whenever there are idle resources 
available, thereby
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 9bf119c76c4..7f3582d195d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -90,13 +90,16 @@ public abstract class AbstractSubscriptionProvider {
 
   private final String username;
   private final String password;
+  private final long heartbeatIntervalMs;
+  private final int connectionTimeoutInMs;
 
   protected abstract AbstractSessionBuilder 
constructSubscriptionSessionBuilder(
       final String host,
       final int port,
       final String username,
       final String password,
-      final int thriftMaxFrameSize);
+      final int thriftMaxFrameSize,
+      final int connectionTimeoutInMs);
 
   protected AbstractSubscriptionProvider(
       final TEndPoint endPoint,
@@ -104,16 +107,25 @@ public abstract class AbstractSubscriptionProvider {
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     this.session =
         new SubscriptionSessionWrapper(
             constructSubscriptionSessionBuilder(
-                endPoint.ip, endPoint.port, username, password, 
thriftMaxFrameSize));
+                endPoint.ip,
+                endPoint.port,
+                username,
+                password,
+                thriftMaxFrameSize,
+                connectionTimeoutInMs));
     this.endPoint = endPoint;
     this.consumerId = consumerId;
     this.consumerGroupId = consumerGroupId;
     this.username = username;
     this.password = password;
+    this.heartbeatIntervalMs = heartbeatIntervalMs;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   SubscriptionSessionConnection getSessionConnection() throws 
IoTDBConnectionException {
@@ -164,6 +176,10 @@ public abstract class AbstractSubscriptionProvider {
     consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
     consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
     consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, 
session.getSqlDialect());
+    consumerAttributes.put(
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
+    consumerAttributes.put(
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, 
String.valueOf(connectionTimeoutInMs));
 
     final PipeSubscribeHandshakeResp resp =
         handshake(new ConsumerConfig(consumerAttributes)); // throw 
SubscriptionException
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
index 3d9561adab2..7083a7dc4af 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java
@@ -102,6 +102,13 @@ public class AbstractSubscriptionPullConsumerBuilder 
extends AbstractSubscriptio
     return this;
   }
 
+  @Override
+  public AbstractSubscriptionPullConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public AbstractSubscriptionPullConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
index 9dba1c39897..f013b98dd19 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java
@@ -108,6 +108,13 @@ public class AbstractSubscriptionPushConsumerBuilder 
extends AbstractSubscriptio
     return this;
   }
 
+  @Override
+  public AbstractSubscriptionPushConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public AbstractSubscriptionPushConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
index 40492876b88..1b90866db9e 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
@@ -32,8 +32,18 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
-    super(endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
+    super(
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   @Override
@@ -42,12 +52,14 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
       final int port,
       final String username,
       final String password,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTableSessionBuilder()
         .host(host)
         .port(port)
         .username(username)
         .password(password)
-        .thriftMaxFrameSize(thriftMaxFrameSize);
+        .thriftMaxFrameSize(thriftMaxFrameSize)
+        .connectionTimeoutInMs(connectionTimeoutInMs);
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index 9e51f7438ff..83dd39aebbf 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -44,9 +44,18 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTableProvider(
-        endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   /////////////////////////////// ctor ///////////////////////////////
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
index b85b669876a..6d8437ac95f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
@@ -99,6 +99,13 @@ public class SubscriptionTablePullConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePullConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePullConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
index 4fc2c352af8..ac44e421dac 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
@@ -40,9 +40,18 @@ public class SubscriptionTablePushConsumer extends 
AbstractSubscriptionPushConsu
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTableProvider(
-        endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   /////////////////////////////// ctor ///////////////////////////////
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
index fcd62b235e2..c372c586db3 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
@@ -101,6 +101,13 @@ public class SubscriptionTablePushConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePushConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePushConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
index 56b07667f2d..c79b64e8c84 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
@@ -32,8 +32,18 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
-    super(endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
+    super(
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   @Override
@@ -42,12 +52,14 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
       final int port,
       final String username,
       final String password,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTreeSessionBuilder()
         .host(host)
         .port(port)
         .username(username)
         .password(password)
-        .thriftMaxFrameSize(thriftMaxFrameSize);
+        .thriftMaxFrameSize(thriftMaxFrameSize)
+        .connectionTimeoutInMs(connectionTimeoutInMs);
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 713dd601e2d..23050893f66 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -51,9 +51,18 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTreeProvider(
-        endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   /////////////////////////////// ctor ///////////////////////////////
@@ -78,6 +87,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
             .fileSaveDir(builder.fileSaveDir)
             .fileSaveFsync(builder.fileSaveFsync)
             .thriftMaxFrameSize(builder.thriftMaxFrameSize)
+            .connectionTimeoutInMs(builder.connectionTimeoutInMs)
             .maxPollParallelism(builder.maxPollParallelism)
             .autoCommit(builder.autoCommit)
             .autoCommitIntervalMs(builder.autoCommitIntervalMs));
@@ -233,6 +243,7 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
     private boolean fileSaveFsync = 
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
 
     private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+    private int connectionTimeoutInMs = 
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
     private int maxPollParallelism = 
ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;
 
     private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE;
@@ -306,6 +317,11 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       return this;
     }
 
+    public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
+      return this;
+    }
+
     public Builder maxPollParallelism(final int maxPollParallelism) {
       // Here the minimum value of max poll parallelism is set to 1 instead of 
0, in order to use a
       // single thread to execute poll whenever there are idle resources 
available, thereby
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
index f3d3b7afba8..8623a492087 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
@@ -99,6 +99,13 @@ public class SubscriptionTreePullConsumerBuilder extends 
AbstractSubscriptionPul
     return this;
   }
 
+  @Override
+  public SubscriptionTreePullConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePullConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
index cd5c548121e..d56e89d47c8 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
@@ -50,9 +50,18 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     return new SubscriptionTreeProvider(
-        endPoint, username, password, consumerId, consumerGroupId, 
thriftMaxFrameSize);
+        endPoint,
+        username,
+        password,
+        consumerId,
+        consumerGroupId,
+        thriftMaxFrameSize,
+        heartbeatIntervalMs,
+        connectionTimeoutInMs);
   }
 
   /////////////////////////////// ctor ///////////////////////////////
@@ -77,6 +86,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
             .fileSaveDir(builder.fileSaveDir)
             .fileSaveFsync(builder.fileSaveFsync)
             .thriftMaxFrameSize(builder.thriftMaxFrameSize)
+            .connectionTimeoutInMs(builder.connectionTimeoutInMs)
             .maxPollParallelism(builder.maxPollParallelism)
             .ackStrategy(builder.ackStrategy)
             .consumeListener(builder.consumeListener)
@@ -187,6 +197,7 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
     private boolean fileSaveFsync = 
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
 
     private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+    private int connectionTimeoutInMs = 
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
     private int maxPollParallelism = 
ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;
 
     private AckStrategy ackStrategy = AckStrategy.defaultValue();
@@ -263,6 +274,11 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       return this;
     }
 
+    public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
+      return this;
+    }
+
     public Builder maxPollParallelism(final int maxPollParallelism) {
       // Here the minimum value of max poll parallelism is set to 1 instead of 
0, in order to use a
       // single thread to execute poll whenever there are idle resources 
available, thereby
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
index 44fde0ed4f0..dd0cb017637 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
@@ -101,6 +101,13 @@ public class SubscriptionTreePushConsumerBuilder extends 
AbstractSubscriptionPus
     return this;
   }
 
+  @Override
+  public SubscriptionTreePushConsumerBuilder connectionTimeoutInMs(
+      final int connectionTimeoutInMs) {
+    super.connectionTimeoutInMs(connectionTimeoutInMs);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePushConsumerBuilder maxPollParallelism(final int 
maxPollParallelism) {
     super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 2e2f27b0a02..200728fe203 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.subscription.agent;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver;
 import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1;
@@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public class SubscriptionReceiverAgent {
@@ -54,10 +60,20 @@ public class SubscriptionReceiverAgent {
           PipeSubscribeResponseType.ACK.getType());
 
   private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+  private final Set<SubscriptionReceiver> activeReceivers = 
ConcurrentHashMap.newKeySet();
+  private final ScheduledExecutorService receiverTimeoutChecker =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          SubscriptionReceiverAgent.class.getSimpleName() + 
"-Timeout-Checker");
 
   SubscriptionReceiverAgent() {
     RECEIVER_CONSTRUCTORS.put(
         PipeSubscribeRequestVersion.VERSION_1.getVersion(), 
SubscriptionReceiverV1::new);
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        receiverTimeoutChecker,
+        this::checkReceiverTimeouts,
+        Math.max(1_000L, 
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+        Math.max(1_000L, 
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+        TimeUnit.MILLISECONDS);
   }
 
   public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
@@ -67,7 +83,10 @@ public class SubscriptionReceiverAgent {
 
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      return getReceiver(reqVersion).handle(req);
+      final SubscriptionReceiver receiver = getReceiver(reqVersion);
+      activeReceivers.add(receiver);
+      receiver.handleTimeout();
+      return receiver.handle(req);
     } else {
       final TSStatus status =
           RpcUtils.getStatus(
@@ -126,8 +145,13 @@ public class SubscriptionReceiverAgent {
   public final void handleClientExit() {
     final SubscriptionReceiver receiver = receiverThreadLocal.get();
     if (receiver != null) {
+      activeReceivers.remove(receiver);
       receiver.handleExit();
       receiverThreadLocal.remove();
     }
   }
+
+  private void checkReceiverTimeouts() {
+    activeReceivers.forEach(SubscriptionReceiver::handleTimeout);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
index c636e1b2dc6..36e3c9b74f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
@@ -31,5 +31,7 @@ public interface SubscriptionReceiver {
 
   void handleExit();
 
+  void handleTimeout();
+
   long remainingMs();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 203b93ef1e4..bfcbbaf850f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -98,6 +98,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
 
   private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9;
+  private static final long HEARTBEAT_TIMEOUT_MULTIPLIER = 3L;
 
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
@@ -112,43 +113,55 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
   private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new 
ThreadLocal<>();
   private final ThreadLocal<PollTimer> pollTimerThreadLocal = new 
ThreadLocal<>();
+  private volatile ConsumerConfig sharedConsumerConfig;
+  private volatile boolean consumerInvalidated;
+  private volatile long lastActivityTimeMs = System.currentTimeMillis();
+  private final AtomicLong inFlightRequestCount = new AtomicLong(0);
 
   private static final String SQL_DIALECT_TABLE_VALUE = "table";
 
   @Override
   public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
     final short reqType = req.getType();
-    if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
-      switch (PipeSubscribeRequestType.valueOf(reqType)) {
-        case HANDSHAKE:
-          return 
handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
-        case HEARTBEAT:
-          return 
handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
-        case SUBSCRIBE:
-          return 
handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
-        case UNSUBSCRIBE:
-          return handlePipeSubscribeUnsubscribe(
-              PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
-        case POLL:
-          return 
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
-        case COMMIT:
-          return 
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
-        case CLOSE:
-          return 
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
-        default:
-          break;
+    beforeHandle(reqType);
+    try {
+      if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
+        switch (PipeSubscribeRequestType.valueOf(reqType)) {
+          case HANDSHAKE:
+            return handlePipeSubscribeHandshake(
+                PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
+          case HEARTBEAT:
+            return handlePipeSubscribeHeartbeat(
+                PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
+          case SUBSCRIBE:
+            return handlePipeSubscribeSubscribe(
+                PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
+          case UNSUBSCRIBE:
+            return handlePipeSubscribeUnsubscribe(
+                PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
+          case POLL:
+            return 
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
+          case COMMIT:
+            return 
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
+          case CLOSE:
+            return 
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
+          default:
+            break;
+        }
       }
-    }
 
-    final TSStatus status =
-        RpcUtils.getStatus(
-            TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
-            String.format("Unknown PipeSubscribeRequestType %s.", reqType));
-    LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response 
status = {}.", status);
-    return new TPipeSubscribeResp(
-        status,
-        PipeSubscribeResponseVersion.VERSION_1.getVersion(),
-        PipeSubscribeResponseType.ACK.getType());
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
+              String.format("Unknown PipeSubscribeRequestType %s.", reqType));
+      LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response 
status = {}.", status);
+      return new TPipeSubscribeResp(
+          status,
+          PipeSubscribeResponseVersion.VERSION_1.getVersion(),
+          PipeSubscribeResponseType.ACK.getType());
+    } finally {
+      inFlightRequestCount.decrementAndGet();
+    }
   }
 
   @Override
@@ -171,6 +184,41 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       unsubscribeCompleteTopics(consumerConfig);
       consumerConfigThreadLocal.remove();
     }
+    clearSharedConsumerState();
+  }
+
+  @Override
+  public void handleTimeout() {
+    final ConsumerConfig consumerConfig;
+    final long inactiveMs;
+    final long timeoutMs;
+    synchronized (this) {
+      consumerConfig = sharedConsumerConfig;
+      if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) {
+        return;
+      }
+      timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig);
+      inactiveMs = System.currentTimeMillis() - lastActivityTimeMs;
+      if (inactiveMs <= timeoutMs) {
+        return;
+      }
+      clearSharedConsumerState();
+    }
+
+    LOGGER.info(
+        "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} 
ms, close it on server side.",
+        consumerConfig,
+        inactiveMs,
+        timeoutMs);
+    try {
+      closeConsumer(consumerConfig);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Subscription: failed to close timed out consumer {} after {} ms 
inactivity",
+          consumerConfig,
+          inactiveMs,
+          e);
+    }
   }
 
   @Override
@@ -241,6 +289,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           consumerConfig);
     }
 
+    activateConsumer(consumerConfig);
+
     final int dataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
     LOGGER.info(
         "Subscription: consumer {} handshake successfully, data node id: {}",
@@ -659,6 +709,9 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     closeConsumer(consumerConfig);
+    consumerConfigThreadLocal.remove();
+    pollTimerThreadLocal.remove();
+    clearSharedConsumerState();
     return 
PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
   }
 
@@ -866,4 +919,37 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       throw new SubscriptionException(exceptionMessage);
     }
   }
+
+  private void beforeHandle(final short reqType) {
+    synchronized (this) {
+      if (consumerInvalidated) {
+        consumerConfigThreadLocal.remove();
+        pollTimerThreadLocal.remove();
+        if (PipeSubscribeRequestType.HANDSHAKE.getType() == reqType) {
+          consumerInvalidated = false;
+        }
+      }
+      inFlightRequestCount.incrementAndGet();
+      lastActivityTimeMs = System.currentTimeMillis();
+    }
+  }
+
+  private void activateConsumer(final ConsumerConfig consumerConfig) {
+    synchronized (this) {
+      sharedConsumerConfig = consumerConfig;
+      consumerInvalidated = false;
+      lastActivityTimeMs = System.currentTimeMillis();
+    }
+  }
+
+  private void clearSharedConsumerState() {
+    sharedConsumerConfig = null;
+    consumerInvalidated = true;
+  }
+
+  private long calculateConsumerInactivityTimeoutMs(final ConsumerConfig 
consumerConfig) {
+    return Math.max(
+        SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(),
+        consumerConfig.getHeartbeatIntervalMs() * 
HEARTBEAT_TIMEOUT_MULTIPLIER);
+  }
 }


Reply via email to