This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch codex/subscription-consumer-timeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 05ce96fb862e4b5222fb7db7560ecf1248a29c84 Author: VGalaxies <[email protected]> AuthorDate: Thu Mar 12 22:37:50 2026 +0800 feat(subscription): support consumer timeout and idle disconnect Expose connectionTimeoutInMs across subscription consumer builders and handshake/session setup. Close idle server-side consumers based on heartbeat inactivity and reuse closeConsumer to unsubscribe topics before dropping the consumer. --- .../rpc/subscription/config/ConsumerConfig.java | 12 ++ .../rpc/subscription/config/ConsumerConstant.java | 3 + .../iotdb/session/AbstractSessionBuilder.java | 1 + .../java/org/apache/iotdb/session/Session.java | 1 + .../subscription/SubscriptionSessionWrapper.java | 2 +- .../SubscriptionTableSessionBuilder.java | 5 + .../SubscriptionTreeSessionBuilder.java | 5 + .../base/AbstractSubscriptionConsumer.java | 16 ++- .../base/AbstractSubscriptionConsumerBuilder.java | 7 + .../base/AbstractSubscriptionProvider.java | 22 +++- .../AbstractSubscriptionPullConsumerBuilder.java | 7 + .../AbstractSubscriptionPushConsumerBuilder.java | 7 + .../consumer/table/SubscriptionTableProvider.java | 20 ++- .../table/SubscriptionTablePullConsumer.java | 13 +- .../SubscriptionTablePullConsumerBuilder.java | 7 + .../table/SubscriptionTablePushConsumer.java | 13 +- .../SubscriptionTablePushConsumerBuilder.java | 7 + .../consumer/tree/SubscriptionTreeProvider.java | 20 ++- .../tree/SubscriptionTreePullConsumer.java | 20 ++- .../tree/SubscriptionTreePullConsumerBuilder.java | 7 + .../tree/SubscriptionTreePushConsumer.java | 20 ++- .../tree/SubscriptionTreePushConsumerBuilder.java | 7 + .../agent/SubscriptionReceiverAgent.java | 26 +++- .../receiver/SubscriptionReceiver.java | 2 + .../receiver/SubscriptionReceiverV1.java | 144 ++++++++++++++++----- 25 files changed, 342 insertions(+), 52 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 100de1f1126..0b5c5a55477 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -80,6 +80,18 @@ public class ConsumerConfig extends PipeParameters { return getString(ConsumerConstant.SQL_DIALECT_KEY); } + public long getHeartbeatIntervalMs() { + return getLongOrDefault( + ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, + ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE); + } + + public int getConnectionTimeoutInMs() { + return getIntOrDefault( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, + ConsumerConstant.CONNECTION_TIMEOUT_MS_DEFAULT_VALUE); + } + public void setConsumerId(final String consumerId) { attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index dd0c583e40d..a0e6f9ed228 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -55,6 +55,9 @@ public class ConsumerConstant { public static final String THRIFT_MAX_FRAME_SIZE_KEY = "thrift-max-frame-size"; + public static final String CONNECTION_TIMEOUT_MS_KEY = "connection-timeout-ms"; + public static final int CONNECTION_TIMEOUT_MS_DEFAULT_VALUE = 0; + public static final String MAX_POLL_PARALLELISM_KEY = "max-poll-parallelism"; public static final int MAX_POLL_PARALLELISM_DEFAULT_VALUE = 1; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java index a8f18ce32bc..f6132d8041f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java @@ -41,6 +41,7 @@ public abstract class AbstractSessionBuilder { public ZoneId zoneId = null; public int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY; public int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + public int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; // this field only take effect in write request, nothing to do with any other type requests, // like query, load and so on. // if set to true, it means that we may redirect the write request to its corresponding leader diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 572e6792e7c..63825299407 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -470,6 +470,7 @@ public class Session implements ISession { this.zoneId = builder.zoneId; this.thriftDefaultBufferSize = builder.thriftDefaultBufferSize; this.thriftMaxFrameSize = builder.thriftMaxFrameSize; + this.connectionTimeoutInMs = builder.connectionTimeoutInMs; this.version = builder.version; this.useSSL = builder.useSSL; this.trustStore = builder.trustStore; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java index ec5a4d6cc99..58c0b4a957f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java @@ -36,7 +36,7 @@ public final class SubscriptionSessionWrapper extends Session { } public void open() throws IoTDBConnectionException { - super.open(); + super.open(enableThriftRpcCompaction, connectionTimeoutInMs); } public void close() throws IoTDBConnectionException { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java index 41e26161c04..c1efd16557d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java @@ -58,6 +58,11 @@ public class SubscriptionTableSessionBuilder extends AbstractSessionBuilder { return this; } + public SubscriptionTableSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) { + super.connectionTimeoutInMs = connectionTimeoutInMs; + return this; + } + public ISubscriptionTableSession build() throws IoTDBConnectionException { final ISubscriptionTableSession session = new SubscriptionTableSession(this); session.open(); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java index dc21732ba97..8a1fa67d822 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java @@ -57,6 +57,11 @@ public class SubscriptionTreeSessionBuilder extends AbstractSessionBuilder { return this; } + public SubscriptionTreeSessionBuilder connectionTimeoutInMs(final int connectionTimeoutInMs) { + super.connectionTimeoutInMs = connectionTimeoutInMs; + return this; + } + public ISubscriptionTreeSession build() { return new SubscriptionTreeSession(this); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index a12340e9d76..e27f182549e 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -118,6 +118,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { private final Set<SubscriptionCommitContext> inFlightFilesCommitContextSet = new HashSet<>(); private final int thriftMaxFrameSize; + private final int connectionTimeoutInMs; private final int maxPollParallelism; @SuppressWarnings("java:S3077") @@ -187,6 +188,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { this.fileSaveFsync = builder.fileSaveFsync; this.thriftMaxFrameSize = builder.thriftMaxFrameSize; + this.connectionTimeoutInMs = builder.connectionTimeoutInMs; this.maxPollParallelism = builder.maxPollParallelism; } @@ -232,6 +234,11 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { properties.getOrDefault( ConsumerConstant.THRIFT_MAX_FRAME_SIZE_KEY, SessionConfig.DEFAULT_MAX_FRAME_SIZE)) + .connectionTimeoutInMs( + (Integer) + properties.getOrDefault( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, + SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS)) .maxPollParallelism( (Integer) properties.getOrDefault( @@ -382,7 +389,9 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs); AbstractSubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint) throws SubscriptionException { @@ -393,7 +402,9 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { this.password, this.consumerId, this.consumerGroupId, - this.thriftMaxFrameSize); + this.thriftMaxFrameSize, + this.heartbeatIntervalMs, + this.connectionTimeoutInMs); try { provider.handshake(); } catch (final Exception e) { @@ -1428,6 +1439,7 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { result.put("fileSaveFsync", String.valueOf(fileSaveFsync)); result.put("inFlightFilesCommitContextSet", inFlightFilesCommitContextSet.toString()); result.put("thriftMaxFrameSize", String.valueOf(thriftMaxFrameSize)); + result.put("connectionTimeoutInMs", String.valueOf(connectionTimeoutInMs)); result.put("maxPollParallelism", String.valueOf(maxPollParallelism)); result.put("subscribedTopics", subscribedTopics.toString()); return result; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 7f965069e73..81bfb6241c9 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -48,6 +48,7 @@ public class AbstractSubscriptionConsumerBuilder { protected boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; protected int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + protected int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; protected int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; public AbstractSubscriptionConsumerBuilder host(final String host) { @@ -120,6 +121,12 @@ public class AbstractSubscriptionConsumerBuilder { return this; } + public AbstractSubscriptionConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public AbstractSubscriptionConsumerBuilder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 9bf119c76c4..7f3582d195d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -90,13 +90,16 @@ public abstract class AbstractSubscriptionProvider { private final String username; private final String password; + private final long heartbeatIntervalMs; + private final int connectionTimeoutInMs; protected abstract AbstractSessionBuilder constructSubscriptionSessionBuilder( final String host, final int port, final String username, final String password, - final int thriftMaxFrameSize); + final int thriftMaxFrameSize, + final int connectionTimeoutInMs); protected AbstractSubscriptionProvider( final TEndPoint endPoint, @@ -104,16 +107,25 @@ public abstract class AbstractSubscriptionProvider { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { this.session = new SubscriptionSessionWrapper( constructSubscriptionSessionBuilder( - endPoint.ip, endPoint.port, username, password, thriftMaxFrameSize)); + endPoint.ip, + endPoint.port, + username, + password, + thriftMaxFrameSize, + connectionTimeoutInMs)); this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; this.username = username; this.password = password; + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.connectionTimeoutInMs = connectionTimeoutInMs; } SubscriptionSessionConnection getSessionConnection() throws IoTDBConnectionException { @@ -164,6 +176,10 @@ public abstract class AbstractSubscriptionProvider { consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); consumerAttributes.put(ConsumerConstant.SQL_DIALECT_KEY, session.getSqlDialect()); + consumerAttributes.put( + ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + consumerAttributes.put( + ConsumerConstant.CONNECTION_TIMEOUT_MS_KEY, String.valueOf(connectionTimeoutInMs)); final PipeSubscribeHandshakeResp resp = handshake(new ConsumerConfig(consumerAttributes)); // throw SubscriptionException diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java index 3d9561adab2..7083a7dc4af 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java @@ -102,6 +102,13 @@ public class AbstractSubscriptionPullConsumerBuilder extends AbstractSubscriptio return this; } + @Override + public AbstractSubscriptionPullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public AbstractSubscriptionPullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java index 9dba1c39897..f013b98dd19 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java @@ -108,6 +108,13 @@ public class AbstractSubscriptionPushConsumerBuilder extends AbstractSubscriptio return this; } + @Override + public AbstractSubscriptionPushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public AbstractSubscriptionPushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 40492876b88..1b90866db9e 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -32,8 +32,18 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { - super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { + super( + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } @Override @@ -42,12 +52,14 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final int port, final String username, final String password, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final int connectionTimeoutInMs) { return new SubscriptionTableSessionBuilder() .host(host) .port(port) .username(username) .password(password) - .thriftMaxFrameSize(thriftMaxFrameSize); + .thriftMaxFrameSize(thriftMaxFrameSize) + .connectionTimeoutInMs(connectionTimeoutInMs); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 9e51f7438ff..83dd39aebbf 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -44,9 +44,18 @@ public class SubscriptionTablePullConsumer extends AbstractSubscriptionPullConsu final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTableProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index b85b669876a..6d8437ac95f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -99,6 +99,13 @@ public class SubscriptionTablePullConsumerBuilder extends AbstractSubscriptionPu return this; } + @Override + public SubscriptionTablePullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index 4fc2c352af8..ac44e421dac 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -40,9 +40,18 @@ public class SubscriptionTablePushConsumer extends AbstractSubscriptionPushConsu final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTableProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index fcd62b235e2..c372c586db3 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -101,6 +101,13 @@ public class SubscriptionTablePushConsumerBuilder extends AbstractSubscriptionPu return this; } + @Override + public SubscriptionTablePushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index 56b07667f2d..c79b64e8c84 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -32,8 +32,18 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { - super(endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { + super( + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } @Override @@ -42,12 +52,14 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final int port, final String username, final String password, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final int connectionTimeoutInMs) { return new SubscriptionTreeSessionBuilder() .host(host) .port(port) .username(username) .password(password) - .thriftMaxFrameSize(thriftMaxFrameSize); + .thriftMaxFrameSize(thriftMaxFrameSize) + .connectionTimeoutInMs(connectionTimeoutInMs); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 713dd601e2d..23050893f66 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -51,9 +51,18 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTreeProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// @@ -78,6 +87,7 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum .fileSaveDir(builder.fileSaveDir) .fileSaveFsync(builder.fileSaveFsync) .thriftMaxFrameSize(builder.thriftMaxFrameSize) + .connectionTimeoutInMs(builder.connectionTimeoutInMs) .maxPollParallelism(builder.maxPollParallelism) .autoCommit(builder.autoCommit) .autoCommitIntervalMs(builder.autoCommitIntervalMs)); @@ -233,6 +243,7 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum private boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; private int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE; @@ -306,6 +317,11 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum return this; } + public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public Builder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index f3d3b7afba8..8623a492087 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -99,6 +99,13 @@ public class SubscriptionTreePullConsumerBuilder extends AbstractSubscriptionPul return this; } + @Override + public SubscriptionTreePullConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index cd5c548121e..d56e89d47c8 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -50,9 +50,18 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum final String password, final String consumerId, final String consumerGroupId, - final int thriftMaxFrameSize) { + final int thriftMaxFrameSize, + final long heartbeatIntervalMs, + final int connectionTimeoutInMs) { return new SubscriptionTreeProvider( - endPoint, username, password, consumerId, consumerGroupId, thriftMaxFrameSize); + endPoint, + username, + password, + consumerId, + consumerGroupId, + thriftMaxFrameSize, + heartbeatIntervalMs, + connectionTimeoutInMs); } /////////////////////////////// ctor /////////////////////////////// @@ -77,6 +86,7 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum .fileSaveDir(builder.fileSaveDir) .fileSaveFsync(builder.fileSaveFsync) .thriftMaxFrameSize(builder.thriftMaxFrameSize) + .connectionTimeoutInMs(builder.connectionTimeoutInMs) .maxPollParallelism(builder.maxPollParallelism) .ackStrategy(builder.ackStrategy) .consumeListener(builder.consumeListener) @@ -187,6 +197,7 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum private boolean fileSaveFsync = ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE; private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE; + private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS; private int maxPollParallelism = ConsumerConstant.MAX_POLL_PARALLELISM_DEFAULT_VALUE; private AckStrategy ackStrategy = AckStrategy.defaultValue(); @@ -263,6 +274,11 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum return this; } + public Builder connectionTimeoutInMs(final int connectionTimeoutInMs) { + this.connectionTimeoutInMs = Math.max(connectionTimeoutInMs, 0); + return this; + } + public Builder maxPollParallelism(final int maxPollParallelism) { // Here the minimum value of max poll parallelism is set to 1 instead of 0, in order to use a // single thread to execute poll whenever there are idle resources available, thereby diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index 44fde0ed4f0..dd0cb017637 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -101,6 +101,13 @@ public class SubscriptionTreePushConsumerBuilder extends AbstractSubscriptionPus return this; } + @Override + public SubscriptionTreePushConsumerBuilder connectionTimeoutInMs( + final int connectionTimeoutInMs) { + super.connectionTimeoutInMs(connectionTimeoutInMs); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder maxPollParallelism(final int maxPollParallelism) { super.maxPollParallelism(maxPollParallelism); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java index 2e2f27b0a02..200728fe203 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.subscription.agent; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver; import org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1; @@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class SubscriptionReceiverAgent { @@ -54,10 +60,20 @@ public class SubscriptionReceiverAgent { PipeSubscribeResponseType.ACK.getType()); private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new ThreadLocal<>(); + private final Set<SubscriptionReceiver> activeReceivers = ConcurrentHashMap.newKeySet(); + private final ScheduledExecutorService receiverTimeoutChecker = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + SubscriptionReceiverAgent.class.getSimpleName() + "-Timeout-Checker"); SubscriptionReceiverAgent() { RECEIVER_CONSTRUCTORS.put( PipeSubscribeRequestVersion.VERSION_1.getVersion(), SubscriptionReceiverV1::new); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + receiverTimeoutChecker, + this::checkReceiverTimeouts, + Math.max(1_000L, SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L), + Math.max(1_000L, SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() / 2L), + TimeUnit.MILLISECONDS); } public TPipeSubscribeResp handle(final TPipeSubscribeReq req) { @@ -67,7 +83,10 @@ public class SubscriptionReceiverAgent { final byte reqVersion = req.getVersion(); if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) { - return getReceiver(reqVersion).handle(req); + final SubscriptionReceiver receiver = getReceiver(reqVersion); + activeReceivers.add(receiver); + receiver.handleTimeout(); + return receiver.handle(req); } else { final TSStatus status = RpcUtils.getStatus( @@ -126,8 +145,13 @@ public class SubscriptionReceiverAgent { public final void handleClientExit() { final SubscriptionReceiver receiver = receiverThreadLocal.get(); if (receiver != null) { + activeReceivers.remove(receiver); receiver.handleExit(); receiverThreadLocal.remove(); } } + + private void checkReceiverTimeouts() { + activeReceivers.forEach(SubscriptionReceiver::handleTimeout); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java index c636e1b2dc6..36e3c9b74f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java @@ -31,5 +31,7 @@ public interface SubscriptionReceiver { void handleExit(); + void handleTimeout(); + long remainingMs(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 203b93ef1e4..bfcbbaf850f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -98,6 +98,7 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class); private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9; + private static final long HEARTBEAT_TIMEOUT_MULTIPLIER = 3L; private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); @@ -112,43 +113,55 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new ThreadLocal<>(); private final ThreadLocal<PollTimer> pollTimerThreadLocal = new ThreadLocal<>(); + private volatile ConsumerConfig sharedConsumerConfig; + private volatile boolean consumerInvalidated; + private volatile long lastActivityTimeMs = System.currentTimeMillis(); + private final AtomicLong inFlightRequestCount = new AtomicLong(0); private static final String SQL_DIALECT_TABLE_VALUE = "table"; @Override public final TPipeSubscribeResp handle(final TPipeSubscribeReq req) { final short reqType = req.getType(); - if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) { - switch (PipeSubscribeRequestType.valueOf(reqType)) { - case HANDSHAKE: - return handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req)); - case HEARTBEAT: - return handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req)); - case SUBSCRIBE: - return handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req)); - case UNSUBSCRIBE: - return handlePipeSubscribeUnsubscribe( - PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req)); - case POLL: - return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req)); - case COMMIT: - return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req)); - case CLOSE: - return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req)); - default: - break; + beforeHandle(reqType); + try { + if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) { + switch (PipeSubscribeRequestType.valueOf(reqType)) { + case HANDSHAKE: + return handlePipeSubscribeHandshake( + PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req)); + case HEARTBEAT: + return handlePipeSubscribeHeartbeat( + PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req)); + case SUBSCRIBE: + return handlePipeSubscribeSubscribe( + PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req)); + case UNSUBSCRIBE: + return handlePipeSubscribeUnsubscribe( + PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req)); + case POLL: + return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req)); + case COMMIT: + return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req)); + case CLOSE: + return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req)); + default: + break; + } } - } - final TSStatus status = - RpcUtils.getStatus( - TSStatusCode.SUBSCRIPTION_TYPE_ERROR, - String.format("Unknown PipeSubscribeRequestType %s.", reqType)); - LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status); - return new TPipeSubscribeResp( - status, - PipeSubscribeResponseVersion.VERSION_1.getVersion(), - PipeSubscribeResponseType.ACK.getType()); + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_TYPE_ERROR, + String.format("Unknown PipeSubscribeRequestType %s.", reqType)); + LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status); + return new TPipeSubscribeResp( + status, + PipeSubscribeResponseVersion.VERSION_1.getVersion(), + PipeSubscribeResponseType.ACK.getType()); + } finally { + inFlightRequestCount.decrementAndGet(); + } } @Override @@ -171,6 +184,41 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { unsubscribeCompleteTopics(consumerConfig); consumerConfigThreadLocal.remove(); } + clearSharedConsumerState(); + } + + @Override + public void handleTimeout() { + final ConsumerConfig consumerConfig; + final long inactiveMs; + final long timeoutMs; + synchronized (this) { + consumerConfig = sharedConsumerConfig; + if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) { + return; + } + timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig); + inactiveMs = System.currentTimeMillis() - lastActivityTimeMs; + if (inactiveMs <= timeoutMs) { + return; + } + clearSharedConsumerState(); + } + + LOGGER.info( + "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} ms, close it on server side.", + consumerConfig, + inactiveMs, + timeoutMs); + try { + closeConsumer(consumerConfig); + } catch (final Exception e) { + LOGGER.warn( + "Subscription: failed to close timed out consumer {} after {} ms inactivity", + consumerConfig, + inactiveMs, + e); + } } @Override @@ -241,6 +289,8 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { consumerConfig); } + activateConsumer(consumerConfig); + final int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); LOGGER.info( "Subscription: consumer {} handshake successfully, data node id: {}", @@ -659,6 +709,9 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { } closeConsumer(consumerConfig); + consumerConfigThreadLocal.remove(); + pollTimerThreadLocal.remove(); + clearSharedConsumerState(); return PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS); } @@ -866,4 +919,37 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { throw new SubscriptionException(exceptionMessage); } } + + private void beforeHandle(final short reqType) { + synchronized (this) { + if (consumerInvalidated) { + consumerConfigThreadLocal.remove(); + pollTimerThreadLocal.remove(); + if (PipeSubscribeRequestType.HANDSHAKE.getType() == reqType) { + consumerInvalidated = false; + } + } + inFlightRequestCount.incrementAndGet(); + lastActivityTimeMs = System.currentTimeMillis(); + } + } + + private void activateConsumer(final ConsumerConfig consumerConfig) { + synchronized (this) { + sharedConsumerConfig = consumerConfig; + consumerInvalidated = false; + lastActivityTimeMs = System.currentTimeMillis(); + } + } + + private void clearSharedConsumerState() { + sharedConsumerConfig = null; + consumerInvalidated = true; + } + + private long calculateConsumerInactivityTimeoutMs(final ConsumerConfig consumerConfig) { + return Math.max( + SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs(), + consumerConfig.getHeartbeatIntervalMs() * HEARTBEAT_TIMEOUT_MULTIPLIER); + } }
