This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new eacd93b0675 feat(subscription): support consumer timeout and idle 
disconnect (#17300)
eacd93b0675 is described below

commit eacd93b0675e07cfb82ada79af3db9fef497c3c6
Author: VGalaxies <[email protected]>
AuthorDate: Thu Mar 19 15:38:11 2026 +0800

    feat(subscription): support consumer timeout and idle disconnect (#17300)
---
 .../rpc/subscription/config/ConsumerConfig.java    |  12 ++
 .../rpc/subscription/config/ConsumerConstant.java  |   3 +
 .../consumer/SubscriptionConsumer.java             |  18 ++-
 .../consumer/SubscriptionProvider.java             |  15 ++-
 .../consumer/SubscriptionPullConsumer.java         |   6 +
 .../consumer/SubscriptionPushConsumer.java         |   6 +
 .../agent/SubscriptionReceiverAgent.java           |  26 +++-
 .../receiver/SubscriptionReceiver.java             |   2 +
 .../receiver/SubscriptionReceiverV1.java           | 144 ++++++++++++++++-----
 .../receiver/SubscriptionReceiverV1Test.java       | 114 ++++++++++++++++
 10 files changed, 312 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 3bcb984732c..5094ae7eea4 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,18 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
   }
 
+  public long getHeartbeatIntervalMs() {
+    return getLongOrDefault(
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE);
+  }
+
+  public int getConnectionTimeoutInMs() {
+    return getIntOrDefault(
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE);
+  }
+
   public void setConsumerId(final String consumerId) {
     attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 5ad1d9ba435..504893b80ed 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -52,6 +52,9 @@ public class ConsumerConstant {
 
   public static final String THRIFT_MAX_FRAME_SIZE_KEY = 
"thrift-max-frame-size";
 
+  public static final String CONNECTION_TIMEOUT_MS_KEY = 
"connection-timeout-ms";
+  public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0;
+
   public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism";
   public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1;
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index feeb4543eb7..df12472f78f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -118,6 +118,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet = 
new HashSet<>();
 
   private final int thriftMaxFrameSize;
+  private final int connectionTimeoutInMs;
   private final int maxPollParallelism;
 
   @SuppressWarnings("java:S3077")
@@ -187,6 +188,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     this.fileSaveFsync = builder.fileSaveFsync;
 
     this.thriftMaxFrameSize = builder.thriftMaxFrameSize;
+    this.connectionTimeoutInMs = builder.connectionTimeoutInMs;
     this.maxPollParallelism = builder.maxPollParallelism;
   }
 
@@ -231,6 +233,11 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
                     properties.getOrDefault(
                         ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY,
                         SessionConfig.DEFAULT_MAX_FRAME_SIZE))
+            .connectionTimeoutInMs(
+                (Integer)
+                    properties.getOrDefault(
+                        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY,
+                        SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS))
             .maxPollParallelism(
                 (Integer)
                     properties.getOrDefault(
@@ -381,7 +388,9 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
             this.password,
             this.consumerId,
             this.consumerGroupId,
-            this.thriftMaxFrameSize);
+            this.thriftMaxFrameSize,
+            this.heartbeatIntervalMs,
+            this.connectionTimeoutInMs);
     try {
       provider.handshake();
     } catch (final Exception e) {
@@ -1397,6 +1406,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     protected boolean fileSaveFsync = 
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
 
     protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
+    protected int connectionTimeoutInMs = 
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
     protected int maxPollParallelism = 
ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE;
 
     public Builder host(final String host) {
@@ -1467,6 +1477,11 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       return this;
     }
 
+    public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0);
+      return this;
+    }
+
     public Builder maxPollParallelism(final int maxPollParallelism) {
       // Here the minimum value of max poll parallelism is set to 1 instead of 
0, in order to use a
       // single thread to execute poll whenever there are idle resources 
available, thereby
@@ -1509,6 +1524,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     result.put("fileSaveFsync", String.valueOf(fileSaveFsync));
     result.put("inFlightFilesCommitContextSet", 
inFlightFilesCommitContextSet.toString());
     result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize));
+    result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs));
     result.put("maxPollParallelism", String.valueOf(maxPollParallelism));
     result.put("subscribedTopics", subscribedTopics.toString());
     return result;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 03eefeaddba..9f153efeb10 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -83,6 +83,8 @@ final class SubscriptionProvider extends SubscriptionSession {
   private final AtomicBoolean isAvailable = new AtomicBoolean(false);
 
   private final TEndPoint endPoint;
+  private final long heartbeatIntervalMs;
+  private final int connectionTimeoutInMs;
   private int dataNodeId;
 
   SubscriptionProvider(
@@ -91,12 +93,16 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       final String password,
       final String consumerId,
       final String consumerGroupId,
-      final int thriftMaxFrameSize) {
+      final int thriftMaxFrameSize,
+      final long heartbeatIntervalMs,
+      final int connectionTimeoutInMs) {
     super(endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize);
 
     this.endPoint = endPoint;
     this.consumerId = consumerId;
     this.consumerGroupId = consumerGroupId;
+    this.heartbeatIntervalMs = heartbeatIntervalMs;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   boolean isAvailable() {
@@ -138,12 +144,15 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       return;
     }
 
-    super.open(); // throw IoTDBConnectionException
+    super.open(false, connectionTimeoutInMs); // throw IoTDBConnectionException
 
-    // TODO: pass the complete consumer parameter configuration to the server
     final Map<String, String> consumerAttributes = new HashMap<>();
     consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, 
consumerGroupId);
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+    consumerAttributes.put(
+        ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
+    consumerAttributes.put(
+        ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, 
String.valueOf(connectionTimeoutInMs));
 
     final PipeSubscribeHandshakeResp resp =
         handshake(new ConsumerConfig(consumerAttributes)); // throw 
SubscriptionException
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index cc56df89782..c1dd131490e 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -366,6 +366,12 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
       return this;
     }
 
+    @Override
+    public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+      super.connectionTimeoutInMs(connectionTimeoutInMs);
+      return this;
+    }
+
     @Override
     public Builder maxPollParallelism(final int maxPollParallelism) {
       super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index d3c34b7ecaf..2a413278090 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -299,6 +299,12 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
       return this;
     }
 
+    @Override
+    public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) {
+      super.connectionTimeoutInMs(connectionTimeoutInMs);
+      return this;
+    }
+
     @Override
     public Builder maxPollParallelism(final int maxPollParallelism) {
       super.maxPollParallelism(maxPollParallelism);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 2e2f27b0a02..200728fe203 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.subscription.agent;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver;
 import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1;
@@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public class SubscriptionReceiverAgent {
@@ -54,10 +60,20 @@ public class SubscriptionReceiverAgent {
           PipeSubscribeResponseType.ACK.getType());
 
   private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+  private final Set<SubscriptionReceiver> activeReceivers = 
ConcurrentHashMap.newKeySet();
+  private final ScheduledExecutorService receiverTimeoutChecker =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          SubscriptionReceiverAgent.class.getSimpleName() + 
"-Timeout-Checker");
 
   SubscriptionReceiverAgent() {
     RECEIVER_CONSTRUCTORS.put(
         PipeSubscribeRequestVersion.VERSION_1.getVersion(), 
SubscriptionReceiverV1::new);
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        receiverTimeoutChecker,
+        this::checkReceiverTimeouts,
+        Math.max(1_000L, 
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+        Math.max(1_000L, 
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L),
+        TimeUnit.MILLISECONDS);
   }
 
   public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
@@ -67,7 +83,10 @@ public class SubscriptionReceiverAgent {
 
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      return getReceiver(reqVersion).handle(req);
+      final SubscriptionReceiver receiver = getReceiver(reqVersion);
+      activeReceivers.add(receiver);
+      receiver.handleTimeout();
+      return receiver.handle(req);
     } else {
       final TSStatus status =
           RpcUtils.getStatus(
@@ -126,8 +145,13 @@ public class SubscriptionReceiverAgent {
   public final void handleClientExit() {
     final SubscriptionReceiver receiver = receiverThreadLocal.get();
     if (receiver != null) {
+      activeReceivers.remove(receiver);
       receiver.handleExit();
       receiverThreadLocal.remove();
     }
   }
+
+  private void checkReceiverTimeouts() {
+    activeReceivers.forEach(SubscriptionReceiver::handleTimeout);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
index c636e1b2dc6..36e3c9b74f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java
@@ -31,5 +31,7 @@ public interface SubscriptionReceiver {
 
   void handleExit();
 
+  void handleTimeout();
+
   long remainingMs();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 4c8033f1612..8cb09983689 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -98,6 +98,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionReceiverV1.class);
 
   private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9;
+  private static final long HEARTBEAT_TIMEOUT_MULTIPLIER = 3L;
 
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
@@ -112,41 +113,53 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
   private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new 
ThreadLocal<>();
   private final ThreadLocal<PollTimer> pollTimerThreadLocal = new 
ThreadLocal<>();
+  private volatile ConsumerConfig sharedConsumerConfig;
+  private volatile boolean consumerInvalidated;
+  private volatile long lastActivityTimeMs = System.currentTimeMillis();
+  private final AtomicLong inFlightRequestCount = new AtomicLong(0);
 
   @Override
   public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
     final short reqType = req.getType();
-    if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
-      switch (PipeSubscribeRequestType.valueOf(reqType)) {
-        case HANDSHAKE:
-          return 
handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
-        case HEARTBEAT:
-          return 
handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
-        case SUBSCRIBE:
-          return 
handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
-        case UNSUBSCRIBE:
-          return handlePipeSubscribeUnsubscribe(
-              PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
-        case POLL:
-          return 
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
-        case COMMIT:
-          return 
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
-        case CLOSE:
-          return 
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
-        default:
-          break;
+    beforeHandle(reqType);
+    try {
+      if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) {
+        switch (PipeSubscribeRequestType.valueOf(reqType)) {
+          case HANDSHAKE:
+            return handlePipeSubscribeHandshake(
+                PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req));
+          case HEARTBEAT:
+            return handlePipeSubscribeHeartbeat(
+                PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req));
+          case SUBSCRIBE:
+            return handlePipeSubscribeSubscribe(
+                PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req));
+          case UNSUBSCRIBE:
+            return handlePipeSubscribeUnsubscribe(
+                PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req));
+          case POLL:
+            return 
handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req));
+          case COMMIT:
+            return 
handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req));
+          case CLOSE:
+            return 
handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req));
+          default:
+            break;
+        }
       }
-    }
 
-    final TSStatus status =
-        RpcUtils.getStatus(
-            TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
-            String.format("Unknown PipeSubscribeRequestType %s.", reqType));
-    LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response 
status = {}.", status);
-    return new TPipeSubscribeResp(
-        status,
-        PipeSubscribeResponseVersion.VERSION_1.getVersion(),
-        PipeSubscribeResponseType.ACK.getType());
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.SUBSCRIPTION_TYPE_ERROR,
+              String.format("Unknown PipeSubscribeRequestType %s.", reqType));
+      LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response 
status = {}.", status);
+      return new TPipeSubscribeResp(
+          status,
+          PipeSubscribeResponseVersion.VERSION_1.getVersion(),
+          PipeSubscribeResponseType.ACK.getType());
+    } finally {
+      inFlightRequestCount.decrementAndGet();
+    }
   }
 
   @Override
@@ -169,6 +182,41 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       unsubscribeCompleteTopics(consumerConfig);
       consumerConfigThreadLocal.remove();
     }
+    clearSharedConsumerState();
+  }
+
+  @Override
+  public void handleTimeout() {
+    final ConsumerConfig consumerConfig;
+    final long inactiveMs;
+    final long timeoutMs;
+    synchronized (this) {
+      consumerConfig = sharedConsumerConfig;
+      if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) {
+        return;
+      }
+      timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig);
+      inactiveMs = System.currentTimeMillis() - lastActivityTimeMs;
+      if (inactiveMs <= timeoutMs) {
+        return;
+      }
+      clearSharedConsumerState();
+    }
+
+    LOGGER.info(
+        "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} 
ms, close it on server side.",
+        consumerConfig,
+        inactiveMs,
+        timeoutMs);
+    try {
+      closeConsumer(consumerConfig);
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Subscription: failed to close timed out consumer {} after {} ms 
inactivity",
+          consumerConfig,
+          inactiveMs,
+          e);
+    }
   }
 
   @Override
@@ -239,6 +287,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           consumerConfig);
     }
 
+    activateConsumer(consumerConfig);
+
     final int dataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
     LOGGER.info(
         "Subscription: consumer {} handshake successfully, data node id: {}",
@@ -657,6 +707,9 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     closeConsumer(consumerConfig);
+    consumerConfigThreadLocal.remove();
+    pollTimerThreadLocal.remove();
+    clearSharedConsumerState();
     return 
PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
   }
 
@@ -860,4 +913,37 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       throw new SubscriptionException(exceptionMessage);
     }
   }
+
+  private void beforeHandle(final short reqType) {
+    synchronized (this) {
+      if (consumerInvalidated) {
+        consumerConfigThreadLocal.remove();
+        pollTimerThreadLocal.remove();
+        if (PipeSubscribeRequestType.HANDSHAKE.getType() == reqType) {
+          consumerInvalidated = false;
+        }
+      }
+      inFlightRequestCount.incrementAndGet();
+      lastActivityTimeMs = System.currentTimeMillis();
+    }
+  }
+
+  private void activateConsumer(final ConsumerConfig consumerConfig) {
+    synchronized (this) {
+      sharedConsumerConfig = consumerConfig;
+      consumerInvalidated = false;
+      lastActivityTimeMs = System.currentTimeMillis();
+    }
+  }
+
+  private void clearSharedConsumerState() {
+    sharedConsumerConfig = null;
+    consumerInvalidated = true;
+  }
+
+  private long calculateConsumerInactivityTimeoutMs(final ConsumerConfig 
consumerConfig) {
+    return Math.max(
+        SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(),
+        consumerConfig.getHeartbeatIntervalMs() * 
HEARTBEAT_TIMEOUT_MULTIPLIER);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
new file mode 100644
index 00000000000..ba0070187e3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.receiver;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SubscriptionReceiverV1Test {
+
+  @Test
+  public void testHandleTimeoutKeepsRecentlyActiveConsumer() throws Exception {
+    final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+    final ConsumerConfig consumerConfig = createConsumerConfig(1_000L);
+
+    setField(receiver, "sharedConsumerConfig", consumerConfig);
+    setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() - 
1_000L);
+
+    receiver.handleTimeout();
+
+    Assert.assertSame(consumerConfig, getField(receiver, 
"sharedConsumerConfig"));
+    Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated"));
+  }
+
+  @Test
+  public void testHandleTimeoutSkipsConsumerWithInFlightRequests() throws 
Exception {
+    final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+    final ConsumerConfig consumerConfig = createConsumerConfig(1_000L);
+
+    setField(receiver, "sharedConsumerConfig", consumerConfig);
+    setField(receiver, "lastActivityTimeMs", System.currentTimeMillis() - 
15_000L);
+    ((AtomicLong) getField(receiver, "inFlightRequestCount")).set(1L);
+
+    receiver.handleTimeout();
+
+    Assert.assertSame(consumerConfig, getField(receiver, 
"sharedConsumerConfig"));
+    Assert.assertFalse((boolean) getField(receiver, "consumerInvalidated"));
+  }
+
+  @Test
+  public void testCalculateConsumerInactivityTimeoutUsesDefaultTimeout() 
throws Exception {
+    final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+
+    Assert.assertEquals(
+        SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(),
+        invokeCalculateConsumerInactivityTimeoutMs(receiver, 
createConsumerConfig(1_000L)));
+  }
+
+  @Test
+  public void testCalculateConsumerInactivityTimeoutUsesHeartbeatMultiple() 
throws Exception {
+    final SubscriptionReceiverV1 receiver = new SubscriptionReceiverV1();
+
+    Assert.assertEquals(
+        15_000L,
+        invokeCalculateConsumerInactivityTimeoutMs(receiver, 
createConsumerConfig(5_000L)));
+  }
+
+  private long invokeCalculateConsumerInactivityTimeoutMs(
+      final SubscriptionReceiverV1 receiver, final ConsumerConfig 
consumerConfig) throws Exception {
+    final Method method =
+        SubscriptionReceiverV1.class.getDeclaredMethod(
+            "calculateConsumerInactivityTimeoutMs", ConsumerConfig.class);
+    method.setAccessible(true);
+    return (long) method.invoke(receiver, consumerConfig);
+  }
+
+  private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) {
+    final Map<String, String> attributes = new HashMap<>();
+    attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + 
UUID.randomUUID());
+    attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + 
UUID.randomUUID());
+    attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
+    return new ConsumerConfig(attributes);
+  }
+
+  private Object getField(final Object target, final String fieldName) throws 
Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return field.get(target);
+  }
+
+  private void setField(final Object target, final String fieldName, final 
Object value)
+      throws Exception {
+    final Field field = target.getClass().getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}

Reply via email to