This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a9b05b211f Subscription: shared thread pool between consumers in one 
process (#12606)
0a9b05b211f is described below

commit 0a9b05b211f57f53c3128b687ccd83be81e7618f
Author: V_Galaxy <[email protected]>
AuthorDate: Thu May 30 21:16:08 2024 +0800

    Subscription: shared thread pool between consumers in one process (#12606)
---
 .../apache/iotdb/SubscriptionSessionExample.java   |   2 +-
 .../it/dual/IoTDBSubscriptionConsumerGroupIT.java  |   6 +-
 .../it/dual/IoTDBSubscriptionTopicIT.java          |   3 +-
 .../it/local/IoTDBSubscriptionBasicIT.java         |  11 +-
 .../it/local/IoTDBSubscriptionIdempotentIT.java    |   2 +-
 .../it/local/IoTDBSubscriptionRestartIT.java       |   2 +-
 .../rpc/subscription/config/ConsumerConstant.java  |  32 +-
 .../subscription/{ => consumer}/AckStrategy.java   |   2 +-
 .../{ => consumer}/AsyncCommitCallback.java        |   2 +-
 .../{ => consumer}/ConsumeListener.java            |   2 +-
 .../subscription/{ => consumer}/ConsumeResult.java |   2 +-
 .../{ => consumer}/SubscriptionConsumer.java       | 376 +++++++++------------
 .../SubscriptionExecutorServiceManager.java        | 281 +++++++++++++++
 .../{ => consumer}/SubscriptionProvider.java       |   4 +-
 .../{ => consumer}/SubscriptionProviders.java      |  21 +-
 .../{ => consumer}/SubscriptionPullConsumer.java   | 160 ++++-----
 .../{ => consumer}/SubscriptionPushConsumer.java   | 249 ++++++++------
 .../event/SubscriptionEventBinaryCache.java        |   2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   2 +-
 .../meta/consumer/ConsumerGroupMetaKeeper.java     |   2 +-
 .../subscription/meta/consumer/ConsumerMeta.java   |   2 +-
 21 files changed, 732 insertions(+), 433 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 383b16a345a..62c869e02bf 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 7eccef3f76a..07d5b4ed791 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
@@ -900,7 +900,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   }
 
   private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
-      final SubscriptionInfo subscriptionInfo) throws Exception {
+      final SubscriptionInfo subscriptionInfo) {
     final SubscriptionPullConsumer consumer =
         new SubscriptionPullConsumer.Builder()
             .host(senderEnv.getIP())
@@ -908,6 +908,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
             .consumerId(subscriptionInfo.consumerId)
             .consumerGroupId(subscriptionInfo.consumerGroupId)
             .autoCommit(false)
+            .fileSaveDir(System.getProperty("java.io.tmpdir")) // hack for 
license check
             .buildPullConsumer();
     consumer.open();
     consumer.subscribe(subscriptionInfo.topicNames);
@@ -924,7 +925,6 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
     final List<Thread> threads = new ArrayList<>();
     for (int i = 0; i < consumers.size(); ++i) {
       final int index = i;
-      final String consumerId = consumers.get(index).getConsumerId();
       final String consumerGroupId = consumers.get(index).getConsumerGroupId();
       final Thread t =
           new Thread(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index f2d4e2191bf..7091c93b4db 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
@@ -578,6 +578,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                       .consumerId("c1")
                       .consumerGroupId("cg1")
                       .autoCommit(false)
+                      .fileSaveDir(System.getProperty("java.io.tmpdir")) // 
hack for license check
                       .buildPullConsumer()) {
                 consumer.open();
                 consumer.subscribe(topicName);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 88a8c15dbd1..46b697e44db 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
-import org.apache.iotdb.session.subscription.AckStrategy;
-import org.apache.iotdb.session.subscription.AsyncCommitCallback;
-import org.apache.iotdb.session.subscription.ConsumeResult;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
-import org.apache.iotdb.session.subscription.SubscriptionPushConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionFileHandler;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
@@ -210,6 +210,7 @@ public class IoTDBSubscriptionBasicIT {
                       .consumerId("c1")
                       .consumerGroupId("cg1")
                       .autoCommit(false)
+                      .fileSaveDir(System.getProperty("java.io.tmpdir")) // 
hack for license check
                       .buildPullConsumer()) {
                 consumer.open();
                 consumer.subscribe(topicName);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index f97d642b384..b671acd6709 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.subscription.it.local;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 
 import org.junit.After;
 import org.junit.Assert;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 2b13acdc6d0..231845921e1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index efb0e53a649..d449b2f9741 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -35,36 +35,42 @@ public class ConsumerConstant {
   public static final String CONSUMER_ID_KEY = "consumer-id";
   public static final String CONSUMER_GROUP_ID_KEY = "group-id";
 
-  public static final String HEARTBEAT_INTERVAL_MS_KEY = 
"heartbeat-interval-ms"; // unit: ms
-  public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000;
-  public static final long HEARTBEAT_INTERVAL_MS_MIN_VALUE = 1_000;
+  public static final String HEARTBEAT_INTERVAL_MS_KEY = 
"heartbeat-interval-ms";
+  public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L;
+  public static final long HEARTBEAT_INTERVAL_MS_MIN_VALUE = 1_000L;
 
-  public static final String ENDPOINTS_SYNC_INTERVAL_MS_KEY =
-      "endpoints-sync-interval-ms"; // unit: ms
-  public static final long ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE = 120_000;
-  public static final long ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE = 5_000;
+  public static final String ENDPOINTS_SYNC_INTERVAL_MS_KEY = 
"endpoints-sync-interval-ms";
+  public static final long ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE = 120_000L;
+  public static final long ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE = 5_000L;
 
   public static final String FILE_SAVE_DIR_KEY = "file-save-dir";
   public static final String FILE_SAVE_DIR_DEFAULT_VALUE =
       Paths.get(System.getProperty("user.dir"), 
"iotdb-subscription").toString();
 
+  public static final String FILE_SAVE_FSYNC_KEY = "file-save-fsync";
+  public static final boolean FILE_SAVE_FSYNC_DEFAULT_VALUE = false;
+
   /////////////////////////////// pull consumer ///////////////////////////////
 
   public static final String AUTO_COMMIT_KEY = "auto-commit";
   public static final boolean AUTO_COMMIT_DEFAULT_VALUE = true;
 
-  public static final String AUTO_COMMIT_INTERVAL_MS_KEY = 
"auto-commit-interval-ms"; // unit: ms
-  public static final long AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 5_000;
-  public static final long AUTO_COMMIT_INTERVAL_MS_MIN_VALUE = 500;
+  public static final String AUTO_COMMIT_INTERVAL_MS_KEY = 
"auto-commit-interval-ms";
+  public static final long AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
+  public static final long AUTO_COMMIT_INTERVAL_MS_MIN_VALUE = 500L;
 
   /////////////////////////////// push consumer ///////////////////////////////
 
   public static final String ACK_STRATEGY_KEY = "ack-strategy";
   public static final String CONSUME_LISTENER_KEY = "consume-listener";
 
-  // TODO: configure those parameters
-  public static final int PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS = 5_000;
-  public static final int PUSH_CONSUMER_AUTO_POLL_TIME_OUT_MS = 10_000;
+  public static final String AUTO_POLL_INTERVAL_MS_KEY = 
"auto-poll-interval-ms";
+  public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
+  public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L;
+
+  public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms";
+  public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L;
+  public static final long AUTO_POLL_TIMEOUT_MS_MIN_VALUE = 1_000L;
 
   private ConsumerConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
similarity index 94%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
index 217d50ffd02..81f29e2497f 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AckStrategy.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AckStrategy.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 public enum AckStrategy {
   BEFORE_CONSUME,
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
similarity index 94%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
index 52e0eb7e091..3fd7d91cba9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AsyncCommitCallback.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/AsyncCommitCallback.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 public interface AsyncCommitCallback {
   default void onComplete() {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
similarity index 94%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
index e26d7986ae7..39488e0222e 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeListener.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeListener.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
similarity index 93%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
index 63bf701a02d..c674c6ce615 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumeResult.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/ConsumeResult.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 public enum ConsumeResult {
   SUCCESS,
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
similarity index 83%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 58655aa2f09..2bbd4a129a9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.SessionConfig;
@@ -66,14 +66,11 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 
-public abstract class SubscriptionConsumer implements AutoCloseable {
+abstract class SubscriptionConsumer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConsumer.class);
 
@@ -88,42 +85,12 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   private final long heartbeatIntervalMs;
   private final long endpointsSyncIntervalMs;
 
-  private final SubscriptionProviders subscriptionProviders;
-
-  private ScheduledExecutorService heartbeatWorkerExecutor;
-  private ScheduledExecutorService endpointsSyncerExecutor;
-
-  private ExecutorService asyncCommitExecutor;
+  private final SubscriptionProviders providers;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(true);
 
   private final String fileSaveDir;
-
-  private Path getFileDir(final String topicName) throws IOException {
-    final Path dirPath =
-        
Paths.get(fileSaveDir).resolve(consumerGroupId).resolve(consumerId).resolve(topicName);
-    Files.createDirectories(dirPath);
-    return dirPath;
-  }
-
-  private Path getFilePath(final String topicName, String fileName) throws 
SubscriptionException {
-    Path filePath;
-    try {
-      filePath = getFileDir(topicName).resolve(fileName);
-      Files.createFile(filePath);
-    } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
-      fileName += "." + RandomStringGenerator.generate(16);
-      try {
-        filePath = getFileDir(topicName).resolve(fileName);
-        Files.createFile(filePath);
-      } catch (final IOException e) {
-        throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
-      }
-    } catch (final IOException e) {
-      throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
-    }
-    return filePath;
-  }
+  private final boolean fileSaveFsync;
 
   public String getConsumerId() {
     return consumerId;
@@ -136,7 +103,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   /////////////////////////////// ctor ///////////////////////////////
 
   protected SubscriptionConsumer(final Builder builder) {
-    final List<TEndPoint> initialEndpoints = new ArrayList<>();
+    final Set<TEndPoint> initialEndpoints = new HashSet<>();
     // From org.apache.iotdb.session.Session.getNodeUrls
     // Priority is given to `host:port` over `nodeUrls`.
     if (Objects.nonNull(builder.host)) {
@@ -144,7 +111,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
     } else {
       
initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
     }
-    this.subscriptionProviders = new SubscriptionProviders(initialEndpoints);
+    this.providers = new SubscriptionProviders(initialEndpoints);
 
     this.username = builder.username;
     this.password = builder.password;
@@ -156,6 +123,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
     this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
 
     this.fileSaveDir = builder.fileSaveDir;
+    this.fileSaveFsync = builder.fileSaveFsync;
   }
 
   protected SubscriptionConsumer(final Builder builder, final Properties 
properties) {
@@ -192,7 +160,12 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
                 (String)
                     properties.getOrDefault(
                         ConsumerConstant.FILE_SAVE_DIR_KEY,
-                        ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE)));
+                        ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE))
+            .fileSaveFsync(
+                (Boolean)
+                    properties.getOrDefault(
+                        ConsumerConstant.FILE_SAVE_FSYNC_KEY,
+                        ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE)));
   }
 
   /////////////////////////////// open & close ///////////////////////////////
@@ -203,18 +176,18 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
 
     // open subscription providers
-    subscriptionProviders.acquireWriteLock();
+    providers.acquireWriteLock();
     try {
-      subscriptionProviders.openProviders(this); // throw SubscriptionException
+      providers.openProviders(this); // throw SubscriptionException
     } finally {
-      subscriptionProviders.releaseWriteLock();
+      providers.releaseWriteLock();
     }
 
-    // launch heartbeat worker
-    launchHeartbeatWorker();
+    // submit heartbeat worker
+    submitHeartbeatWorker();
 
-    // launch endpoints syncer
-    launchEndpointsSyncer();
+    // submit endpoints syncer
+    submitEndpointsSyncer();
 
     isClosed.set(false);
   }
@@ -225,26 +198,12 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
       return;
     }
 
-    try {
-      // shutdown endpoints syncer
-      shutdownEndpointsSyncer();
-
-      // shutdown heartbeat worker
-      shutdownHeartbeatWorker();
-
-      // shutdown async commit worker if needed
-      shutdownAsyncCommitWorkerIfNeeded();
+    // close subscription providers
+    providers.acquireWriteLock();
+    providers.closeProviders();
+    providers.releaseWriteLock();
 
-      // close subscription providers
-      subscriptionProviders.acquireWriteLock();
-      try {
-        subscriptionProviders.closeProviders();
-      } finally {
-        subscriptionProviders.releaseWriteLock();
-      }
-    } finally {
-      isClosed.set(true);
-    }
+    isClosed.set(true);
   }
 
   boolean isClosed() {
@@ -262,11 +221,11 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   public void subscribe(final Set<String> topicNames) throws 
SubscriptionException {
-    subscriptionProviders.acquireReadLock();
+    providers.acquireReadLock();
     try {
       subscribeWithRedirection(topicNames);
     } finally {
-      subscriptionProviders.releaseReadLock();
+      providers.releaseReadLock();
     }
   }
 
@@ -279,74 +238,14 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   public void unsubscribe(final Set<String> topicNames) throws 
SubscriptionException {
-    subscriptionProviders.acquireReadLock();
+    providers.acquireReadLock();
     try {
       unsubscribeWithRedirection(topicNames);
     } finally {
-      subscriptionProviders.releaseReadLock();
+      providers.releaseReadLock();
     }
   }
 
-  /////////////////////////////// heartbeat ///////////////////////////////
-
-  @SuppressWarnings("unsafeThreadSchedule")
-  private void launchHeartbeatWorker() {
-    heartbeatWorkerExecutor =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> {
-              final Thread t =
-                  new Thread(
-                      Thread.currentThread().getThreadGroup(), r, 
"ConsumerHeartbeatWorker", 0);
-              if (!t.isDaemon()) {
-                t.setDaemon(true);
-              }
-              if (t.getPriority() != Thread.NORM_PRIORITY) {
-                t.setPriority(Thread.NORM_PRIORITY);
-              }
-              return t;
-            });
-    heartbeatWorkerExecutor.scheduleWithFixedDelay(
-        () -> subscriptionProviders.heartbeat(this),
-        generateRandomInitialDelayMs(heartbeatIntervalMs),
-        heartbeatIntervalMs,
-        TimeUnit.MILLISECONDS);
-  }
-
-  private void shutdownHeartbeatWorker() {
-    heartbeatWorkerExecutor.shutdown();
-    heartbeatWorkerExecutor = null;
-  }
-
-  /////////////////////////////// sync endpoints 
///////////////////////////////
-
-  @SuppressWarnings("unsafeThreadSchedule")
-  private void launchEndpointsSyncer() {
-    endpointsSyncerExecutor =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> {
-              final Thread t =
-                  new Thread(
-                      Thread.currentThread().getThreadGroup(), r, 
"SubscriptionEndpointsSyncer", 0);
-              if (!t.isDaemon()) {
-                t.setDaemon(true);
-              }
-              if (t.getPriority() != Thread.NORM_PRIORITY) {
-                t.setPriority(Thread.NORM_PRIORITY);
-              }
-              return t;
-            });
-    endpointsSyncerExecutor.scheduleWithFixedDelay(
-        () -> subscriptionProviders.sync(this),
-        generateRandomInitialDelayMs(endpointsSyncIntervalMs),
-        endpointsSyncIntervalMs,
-        TimeUnit.MILLISECONDS);
-  }
-
-  private void shutdownEndpointsSyncer() {
-    endpointsSyncerExecutor.shutdown();
-    endpointsSyncerExecutor = null;
-  }
-
   /////////////////////////////// subscription provider 
///////////////////////////////
 
   SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
@@ -376,6 +275,34 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
     return provider;
   }
 
+  /////////////////////////////// file ops ///////////////////////////////
+
+  private Path getFileDir(final String topicName) throws IOException {
+    final Path dirPath =
+        
Paths.get(fileSaveDir).resolve(consumerGroupId).resolve(consumerId).resolve(topicName);
+    Files.createDirectories(dirPath);
+    return dirPath;
+  }
+
+  private Path getFilePath(final String topicName, String fileName) throws 
SubscriptionException {
+    Path filePath;
+    try {
+      filePath = getFileDir(topicName).resolve(fileName);
+      Files.createFile(filePath);
+    } catch (final FileAlreadyExistsException fileAlreadyExistsException) {
+      fileName += "." + RandomStringGenerator.generate(16);
+      try {
+        filePath = getFileDir(topicName).resolve(fileName);
+        Files.createFile(filePath);
+      } catch (final IOException e) {
+        throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+      }
+    } catch (final IOException e) {
+      throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+    }
+    return filePath;
+  }
+
   /////////////////////////////// poll ///////////////////////////////
 
   protected List<SubscriptionMessage> poll(final Set<String> topicNames, final 
long timeoutMs)
@@ -546,7 +473,9 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
 
             // write file piece
             fileWriter.write(((FilePiecePayload) payload).getFilePiece());
-            fileWriter.getFD().sync();
+            if (fileSaveFsync) {
+              fileWriter.getFD().sync();
+            }
 
             // check offset
             if (!Objects.equals(
@@ -599,8 +528,10 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
               throw new SubscriptionRuntimeNonCriticalException(errorMessage);
             }
 
-            // sync and close
-            fileWriter.getFD().sync();
+            // optional sync and close
+            if (fileSaveFsync) {
+              fileWriter.getFD().sync();
+            }
             fileWriter.close();
 
             LOGGER.info(
@@ -641,9 +572,9 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
 
   private List<SubscriptionPollResponse> pollInternal(final Set<String> 
topicNames)
       throws SubscriptionException {
-    subscriptionProviders.acquireReadLock();
+    providers.acquireReadLock();
     try {
-      final SubscriptionProvider provider = 
subscriptionProviders.getNextAvailableProvider();
+      final SubscriptionProvider provider = 
providers.getNextAvailableProvider();
       if (Objects.isNull(provider) || !provider.isAvailable()) {
         throw new SubscriptionConnectionException(
             String.format(
@@ -659,16 +590,16 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
         return Collections.emptyList();
       }
     } finally {
-      subscriptionProviders.releaseReadLock();
+      providers.releaseReadLock();
     }
   }
 
   private List<SubscriptionPollResponse> pollFileInternal(
       final int dataNodeId, final String topicName, final String fileName, 
final long writingOffset)
       throws SubscriptionException {
-    subscriptionProviders.acquireReadLock();
+    providers.acquireReadLock();
     try {
-      final SubscriptionProvider provider = 
subscriptionProviders.getProvider(dataNodeId);
+      final SubscriptionProvider provider = providers.getProvider(dataNodeId);
       if (Objects.isNull(provider) || !provider.isAvailable()) {
         throw new SubscriptionConnectionException(
             String.format(
@@ -686,7 +617,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
         return Collections.emptyList();
       }
     } finally {
-      subscriptionProviders.releaseReadLock();
+      providers.releaseReadLock();
     }
   }
 
@@ -725,9 +656,9 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
       final List<SubscriptionCommitContext> subscriptionCommitContexts,
       final boolean nack)
       throws SubscriptionException {
-    subscriptionProviders.acquireReadLock();
+    providers.acquireReadLock();
     try {
-      final SubscriptionProvider provider = 
subscriptionProviders.getProvider(dataNodeId);
+      final SubscriptionProvider provider = providers.getProvider(dataNodeId);
       if (Objects.isNull(provider) || !provider.isAvailable()) {
         throw new SubscriptionConnectionException(
             String.format(
@@ -736,27 +667,92 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
       }
       provider.commit(subscriptionCommitContexts, nack);
     } finally {
-      subscriptionProviders.releaseReadLock();
+      providers.releaseReadLock();
     }
   }
 
+  /////////////////////////////// heartbeat ///////////////////////////////
+
+  private void submitHeartbeatWorker() {
+    final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+    future[0] =
+        SubscriptionExecutorServiceManager.submitHeartbeatWorker(
+            () -> {
+              if (isClosed()) {
+                if (Objects.nonNull(future[0])) {
+                  future[0].cancel(false);
+                  LOGGER.info("SubscriptionConsumer {} cancel heartbeat 
worker", this);
+                }
+                return;
+              }
+              providers.heartbeat(this);
+            },
+            heartbeatIntervalMs);
+    LOGGER.info("SubscriptionConsumer {} submit heartbeat worker", this);
+  }
+
+  /////////////////////////////// sync endpoints 
///////////////////////////////
+
+  private void submitEndpointsSyncer() {
+    final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+    future[0] =
+        SubscriptionExecutorServiceManager.submitEndpointsSyncer(
+            () -> {
+              if (isClosed()) {
+                if (Objects.nonNull(future[0])) {
+                  future[0].cancel(false);
+                  LOGGER.info("SubscriptionConsumer {} cancel endpoints 
syncer", this);
+                }
+                return;
+              }
+              providers.sync(this);
+            },
+            endpointsSyncIntervalMs);
+    LOGGER.info("SubscriptionConsumer {} submit endpoints syncer", this);
+  }
+
   /////////////////////////////// commit async ///////////////////////////////
 
   protected void commitAsync(
       final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback 
callback) {
-    // launch async commit worker if needed
-    launchAsyncCommitWorkerIfNeeded();
+    SubscriptionExecutorServiceManager.submitAsyncCommitWorker(
+        new AsyncCommitWorker(messages, callback));
+  }
+
+  private class AsyncCommitWorker implements Runnable {
+
+    private final Iterable<SubscriptionMessage> messages;
+    private final AsyncCommitCallback callback;
+
+    public AsyncCommitWorker(
+        final Iterable<SubscriptionMessage> messages, final 
AsyncCommitCallback callback) {
+      this.messages = messages;
+      this.callback = callback;
+    }
+
+    @Override
+    public void run() {
+      if (isClosed()) {
+        return;
+      }
 
-    asyncCommitExecutor.submit(new AsyncCommitWorker(messages, callback));
+      try {
+        ack(messages);
+        callback.onComplete();
+      } catch (final Exception e) {
+        callback.onFailure(e);
+      }
+    }
   }
 
   protected CompletableFuture<Void> commitAsync(final 
Iterable<SubscriptionMessage> messages) {
-    // launch async commit worker if needed
-    launchAsyncCommitWorkerIfNeeded();
-
     final CompletableFuture<Void> future = new CompletableFuture<>();
-    asyncCommitExecutor.submit(
+    SubscriptionExecutorServiceManager.submitAsyncCommitWorker(
         () -> {
+          if (isClosed()) {
+            return;
+          }
+
           try {
             ack(messages);
             future.complete(null);
@@ -770,7 +766,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   /////////////////////////////// redirection ///////////////////////////////
 
   private void subscribeWithRedirection(final Set<String> topicNames) throws 
SubscriptionException {
-    final List<SubscriptionProvider> providers = 
subscriptionProviders.getAllAvailableProviders();
+    final List<SubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
     if (providers.isEmpty()) {
       throw new SubscriptionConnectionException(
           String.format(
@@ -800,7 +796,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
 
   private void unsubscribeWithRedirection(final Set<String> topicNames)
       throws SubscriptionException {
-    final List<SubscriptionProvider> providers = 
subscriptionProviders.getAllAvailableProviders();
+    final List<SubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
     if (providers.isEmpty()) {
       throw new SubscriptionConnectionException(
           String.format(
@@ -829,7 +825,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   }
 
   Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws 
SubscriptionException {
-    final List<SubscriptionProvider> providers = 
subscriptionProviders.getAllAvailableProviders();
+    final List<SubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
     if (providers.isEmpty()) {
       throw new SubscriptionConnectionException(
           String.format(
@@ -873,6 +869,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
         ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE;
 
     protected String fileSaveDir = 
ConsumerConstant.FILE_SAVE_DIR_DEFAULT_VALUE;
+    protected boolean fileSaveFsync = 
ConsumerConstant.FILE_SAVE_FSYNC_DEFAULT_VALUE;
 
     public Builder host(final String host) {
       this.host = host;
@@ -926,71 +923,14 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
       return this;
     }
 
-    public abstract SubscriptionPullConsumer buildPullConsumer();
-
-    public abstract SubscriptionPushConsumer buildPushConsumer();
-  }
-
-  /////////////////////////////// commit async worker 
///////////////////////////////
-
-  private void launchAsyncCommitWorkerIfNeeded() {
-    if (asyncCommitExecutor == null) {
-      synchronized (this) {
-        if (asyncCommitExecutor != null) {
-          return;
-        }
-
-        asyncCommitExecutor =
-            Executors.newSingleThreadExecutor(
-                r -> {
-                  final Thread t =
-                      new Thread(
-                          Thread.currentThread().getThreadGroup(),
-                          r,
-                          "SubscriptionConsumerAsyncCommitWorker",
-                          0);
-                  if (!t.isDaemon()) {
-                    t.setDaemon(true);
-                  }
-                  if (t.getPriority() != Thread.NORM_PRIORITY) {
-                    t.setPriority(Thread.NORM_PRIORITY);
-                  }
-                  return t;
-                });
-      }
-    }
-  }
-
-  private void shutdownAsyncCommitWorkerIfNeeded() {
-    if (asyncCommitExecutor != null) {
-      asyncCommitExecutor.shutdown();
-      asyncCommitExecutor = null;
-    }
-  }
-
-  class AsyncCommitWorker implements Runnable {
-    private final Iterable<SubscriptionMessage> messages;
-    private final AsyncCommitCallback callback;
-
-    public AsyncCommitWorker(
-        final Iterable<SubscriptionMessage> messages, final 
AsyncCommitCallback callback) {
-      this.messages = messages;
-      this.callback = callback;
+    public Builder fileSaveFsync(final boolean fileSaveFsync) {
+      this.fileSaveFsync = fileSaveFsync;
+      return this;
     }
 
-    @Override
-    public void run() {
-      if (isClosed()) {
-        return;
-      }
+    public abstract SubscriptionPullConsumer buildPullConsumer();
 
-      try {
-        ack(messages);
-        callback.onComplete();
-      } catch (final Exception e) {
-        callback.onFailure(e);
-      }
-    }
+    public abstract SubscriptionPushConsumer buildPushConsumer();
   }
 
   /////////////////////////////// object ///////////////////////////////
@@ -1003,10 +943,4 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
         + consumerGroupId
         + "}";
   }
-
-  /////////////////////////////// utility ///////////////////////////////
-
-  protected long generateRandomInitialDelayMs(final long maxMs) {
-    return (long) (Math.random() * maxMs);
-  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
new file mode 100644
index 00000000000..5a587ff96fb
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionExecutorServiceManager.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+final class SubscriptionExecutorServiceManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(SubscriptionExecutorServiceManager.class);
+
+  private static final long AWAIT_TERMINATION_TIMEOUT_MS = 10_000L;
+
+  private static final String CONTROL_FLOW_EXECUTOR_NAME = 
"SubscriptionControlFlowExecutor";
+  private static final String UPSTREAM_DATA_FLOW_EXECUTOR_NAME =
+      "SubscriptionUpstreamDataFlowExecutor";
+  private static final String DOWNSTREAM_DATA_FLOW_EXECUTOR_NAME =
+      "SubscriptionDownstreamDataFlowExecutor";
+
+  /**
+   * Control Flow Executor: execute heartbeat worker and endpoints syncer for 
{@link
+   * SubscriptionConsumer}
+   */
+  private static final SubscriptionExecutorService CONTROL_FLOW_EXECUTOR =
+      new SubscriptionExecutorService(
+          CONTROL_FLOW_EXECUTOR_NAME, 
Math.max(Runtime.getRuntime().availableProcessors() / 2, 1));
+
+  /**
+   * Upstream Data Flow Executor: execute auto commit worker and async commit 
worker for {@link
+   * SubscriptionPullConsumer}
+   */
+  private static final SubscriptionExecutorService UPSTREAM_DATA_FLOW_EXECUTOR 
=
+      new SubscriptionExecutorService(
+          UPSTREAM_DATA_FLOW_EXECUTOR_NAME,
+          Math.max(Runtime.getRuntime().availableProcessors() / 2, 1));
+
+  /**
+   * Downstream Data Flow Executor: execute auto poll worker for {@link 
SubscriptionPushConsumer}
+   */
+  private static final SubscriptionExecutorService 
DOWNSTREAM_DATA_FLOW_EXECUTOR =
+      new SubscriptionExecutorService(
+          DOWNSTREAM_DATA_FLOW_EXECUTOR_NAME,
+          Math.max(Runtime.getRuntime().availableProcessors(), 1));
+
+  /////////////////////////////// set core pool size 
///////////////////////////////
+
+  public static void setControlFlowExecutorCorePoolSize(final int 
corePoolSize) {
+    CONTROL_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+  }
+
+  public static void setUpstreamDataFlowExecutorCorePoolSize(final int 
corePoolSize) {
+    UPSTREAM_DATA_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+  }
+
+  public static void setDownstreamDataFlowExecutorCorePoolSize(final int 
corePoolSize) {
+    DOWNSTREAM_DATA_FLOW_EXECUTOR.setCorePoolSize(corePoolSize);
+  }
+
+  /////////////////////////////// shutdown hook ///////////////////////////////
+
+  static {
+    // register shutdown hook
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                new SubscriptionExecutorServiceShutdownHook(),
+                "SubscriptionExecutorServiceShutdownHook"));
+  }
+
+  private static class SubscriptionExecutorServiceShutdownHook implements 
Runnable {
+
+    @Override
+    public void run() {
+      // shutdown executors
+      CONTROL_FLOW_EXECUTOR.shutdown();
+      UPSTREAM_DATA_FLOW_EXECUTOR.shutdown();
+      DOWNSTREAM_DATA_FLOW_EXECUTOR.shutdown();
+    }
+  }
+
+  /////////////////////////////// submitter ///////////////////////////////
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> submitHeartbeatWorker(
+      final Runnable task, final long heartbeatIntervalMs) {
+    CONTROL_FLOW_EXECUTOR.launchIfNeeded();
+    return CONTROL_FLOW_EXECUTOR.scheduleWithFixedDelay(
+        task,
+        generateRandomInitialDelayMs(heartbeatIntervalMs),
+        heartbeatIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> submitEndpointsSyncer(
+      final Runnable task, final long endpointsSyncIntervalMs) {
+    CONTROL_FLOW_EXECUTOR.launchIfNeeded();
+    return CONTROL_FLOW_EXECUTOR.scheduleWithFixedDelay(
+        task,
+        generateRandomInitialDelayMs(endpointsSyncIntervalMs),
+        endpointsSyncIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> submitAutoCommitWorker(
+      final Runnable task, final long autoCommitIntervalMs) {
+    UPSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+    return UPSTREAM_DATA_FLOW_EXECUTOR.scheduleWithFixedDelay(
+        task,
+        generateRandomInitialDelayMs(autoCommitIntervalMs),
+        autoCommitIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public static void submitAsyncCommitWorker(final Runnable task) {
+    UPSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+    UPSTREAM_DATA_FLOW_EXECUTOR.submit(task);
+  }
+
+  @SuppressWarnings("unsafeThreadSchedule")
+  public static ScheduledFuture<?> submitAutoPollWorker(
+      final Runnable task, final long autoPollIntervalMs) {
+    DOWNSTREAM_DATA_FLOW_EXECUTOR.launchIfNeeded();
+    return DOWNSTREAM_DATA_FLOW_EXECUTOR.scheduleWithFixedDelay(
+        task,
+        generateRandomInitialDelayMs(autoPollIntervalMs),
+        autoPollIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /////////////////////////////// subscription executor service 
///////////////////////////////
+
+  private static class SubscriptionExecutorService {
+
+    String name;
+    volatile int corePoolSize;
+    volatile ScheduledExecutorService executor;
+
+    SubscriptionExecutorService(final String name, final int corePoolSize) {
+      this.name = name;
+      this.corePoolSize = corePoolSize;
+    }
+
+    boolean isShutdown() {
+      return Objects.isNull(this.executor);
+    }
+
+    void setCorePoolSize(final int corePoolSize) {
+      if (!isShutdown()) {
+        synchronized (this) {
+          if (!isShutdown()) {
+            this.corePoolSize = corePoolSize;
+            return;
+          }
+        }
+      }
+
+      LOGGER.warn(
+          "{} has been launched, set core pool size to {} will be ignored",
+          this.name,
+          corePoolSize);
+    }
+
+    void launchIfNeeded() {
+      if (isShutdown()) {
+        synchronized (this) {
+          if (isShutdown()) {
+            LOGGER.info("Launching {} with core pool size {}...", this.name, 
this.corePoolSize);
+
+            this.executor =
+                Executors.newScheduledThreadPool(
+                    this.corePoolSize,
+                    r -> {
+                      final Thread t =
+                          new Thread(Thread.currentThread().getThreadGroup(), 
r, this.name, 0);
+                      if (!t.isDaemon()) {
+                        t.setDaemon(true);
+                      }
+                      if (t.getPriority() != Thread.NORM_PRIORITY) {
+                        t.setPriority(Thread.NORM_PRIORITY);
+                      }
+                      return t;
+                    });
+          }
+        }
+      }
+    }
+
+    void shutdown() {
+      if (!isShutdown()) {
+        synchronized (this) {
+          if (!isShutdown()) {
+            LOGGER.info("Shutting down {}...", this.name);
+
+            this.executor.shutdown();
+            try {
+              if (!this.executor.awaitTermination(
+                  AWAIT_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+                this.executor.shutdownNow();
+                LOGGER.warn(
+                    "Interrupt the worker, which may cause some task 
inconsistent. Please check the biz logs.");
+                if (!this.executor.awaitTermination(
+                    AWAIT_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
+                  LOGGER.error(
+                      "Thread pool can't be shutdown even with interrupting 
worker threads, which may cause some task inconsistent. Please check the biz 
logs.");
+                }
+              }
+            } catch (final InterruptedException e) {
+              this.executor.shutdownNow();
+              LOGGER.error(
+                  "The current thread is interrupted when it is trying to stop 
the worker threads. This may leave an inconsistent state. Please check the biz 
logs.");
+              Thread.currentThread().interrupt();
+            }
+
+            this.executor = null;
+          }
+        }
+      }
+    }
+
+    @SuppressWarnings("unsafeThreadSchedule")
+    ScheduledFuture<?> scheduleWithFixedDelay(
+        final Runnable task, final long initialDelay, final long delay, final 
TimeUnit unit) {
+      if (!isShutdown()) {
+        synchronized (this) {
+          if (!isShutdown()) {
+            return this.executor.scheduleWithFixedDelay(task, initialDelay, 
delay, unit);
+          }
+        }
+      }
+
+      LOGGER.warn("{} has not been launched, ignore scheduleWithFixedDelay for 
task", this.name);
+      return null;
+    }
+
+    Future<?> submit(final Runnable task) {
+      if (!isShutdown()) {
+        synchronized (this) {
+          if (!isShutdown()) {
+            return this.executor.submit(task);
+          }
+        }
+      }
+
+      LOGGER.warn("{} has not been launched, ignore submit task", this.name);
+      return null;
+    }
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  private static long generateRandomInitialDelayMs(final long maxMs) {
+    return (long) (Math.random() * maxMs);
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
similarity index 98%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index cccc7cdb389..70589472f82 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -41,6 +41,8 @@ import 
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscrib
 import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
 import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
similarity index 94%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 8038dc724d6..6d250f6d201 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProviders.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -46,9 +47,9 @@ final class SubscriptionProviders {
 
   private final ReentrantReadWriteLock subscriptionProvidersLock = new 
ReentrantReadWriteLock(true);
 
-  private final List<TEndPoint> initialEndpoints;
+  private final Set<TEndPoint> initialEndpoints;
 
-  SubscriptionProviders(final List<TEndPoint> initialEndpoints) {
+  SubscriptionProviders(final Set<TEndPoint> initialEndpoints) {
     this.initialEndpoints = initialEndpoints;
   }
 
@@ -336,4 +337,18 @@ final class SubscriptionProviders {
       }
     }
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("SubscriptionProviders{");
+    for (final Map.Entry<Integer, SubscriptionProvider> entry : 
subscriptionProviders.entrySet()) {
+      sb.append(entry.getValue().toString()).append(", ");
+    }
+    if (!subscriptionProviders.isEmpty()) {
+      sb.delete(sb.length() - 2, sb.length());
+    }
+    sb.append("}");
+    return sb.toString();
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
similarity index 79%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 25315f8c624..2306e4d05c1 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -30,17 +30,28 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * The {@link SubscriptionPullConsumer} corresponds to the pull consumption 
mode in the message
+ * queue.
+ *
+ * <p>User code needs to actively call the data retrieval logic, i.e., the 
{@link #poll} method.
+ *
+ * <p>Auto-commit for consumption progress can be configured in {@link 
#autoCommit}.
+ *
+ * <p>NOTE: It is not recommended to use the {@link #poll} method with the 
same consumer in a
+ * multithreaded environment. Instead, it is advised to increase the number of 
consumers to improve
+ * data retrieval parallelism.
+ */
 public class SubscriptionPullConsumer extends SubscriptionConsumer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPullConsumer.class);
@@ -48,7 +59,6 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
   private final boolean autoCommit;
   private final long autoCommitIntervalMs;
 
-  private ScheduledExecutorService autoCommitWorkerExecutor;
   private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -86,7 +96,8 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
         properties);
 
     this.autoCommit = autoCommit;
-    this.autoCommitIntervalMs = autoCommitIntervalMs;
+    this.autoCommitIntervalMs =
+        Math.max(autoCommitIntervalMs, 
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_MIN_VALUE);
   }
 
   /////////////////////////////// open & close ///////////////////////////////
@@ -99,7 +110,8 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
     super.open();
 
     if (autoCommit) {
-      launchAutoCommitWorker();
+      uncommittedMessages = new ConcurrentSkipListMap<>();
+      submitAutoCommitWorker();
     }
 
     isClosed.set(false);
@@ -111,35 +123,31 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
       return;
     }
 
-    try {
-      if (autoCommit) {
-        // shutdown auto commit worker
-        shutdownAutoCommitWorker();
-
-        // commit all uncommitted messages
-        commitAllUncommittedMessages();
-      }
-      super.close();
-    } finally {
-      isClosed.set(true);
+    if (autoCommit) {
+      // commit all uncommitted messages
+      commitAllUncommittedMessages();
     }
+
+    super.close();
+    isClosed.set(true);
   }
 
   /////////////////////////////// poll & commit ///////////////////////////////
 
-  public List<SubscriptionMessage> poll(final Duration timeoutMs) throws 
SubscriptionException {
-    return poll(Collections.emptySet(), timeoutMs.toMillis());
+  public List<SubscriptionMessage> poll(final Duration timeout) throws 
SubscriptionException {
+    return poll(Collections.emptySet(), timeout.toMillis());
   }
 
   public List<SubscriptionMessage> poll(final long timeoutMs) throws 
SubscriptionException {
     return poll(Collections.emptySet(), timeoutMs);
   }
 
-  public List<SubscriptionMessage> poll(final Set<String> topicNames, final 
Duration timeoutMs)
+  public List<SubscriptionMessage> poll(final Set<String> topicNames, final 
Duration timeout)
       throws SubscriptionException {
-    return poll(topicNames, timeoutMs.toMillis());
+    return poll(topicNames, timeout.toMillis());
   }
 
+  @Override
   public List<SubscriptionMessage> poll(final Set<String> topicNames, final 
long timeoutMs)
       throws SubscriptionException {
     final List<SubscriptionMessage> messages = super.poll(topicNames, 
timeoutMs);
@@ -189,36 +197,47 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
 
   /////////////////////////////// auto commit ///////////////////////////////
 
-  @SuppressWarnings("unsafeThreadSchedule")
-  private void launchAutoCommitWorker() {
-    uncommittedMessages = new ConcurrentSkipListMap<>();
-    autoCommitWorkerExecutor =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> {
-              final Thread t =
-                  new Thread(
-                      Thread.currentThread().getThreadGroup(),
-                      r,
-                      "PullConsumerAutoCommitWorker",
-                      0);
-              if (!t.isDaemon()) {
-                t.setDaemon(true);
+  private void submitAutoCommitWorker() {
+    final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+    future[0] =
+        SubscriptionExecutorServiceManager.submitAutoCommitWorker(
+            () -> {
+              if (isClosed()) {
+                if (Objects.nonNull(future[0])) {
+                  future[0].cancel(false);
+                  LOGGER.info("SubscriptionPullConsumer {} cancel auto commit 
worker", this);
+                }
+                return;
               }
-              if (t.getPriority() != Thread.NORM_PRIORITY) {
-                t.setPriority(Thread.NORM_PRIORITY);
-              }
-              return t;
-            });
-    autoCommitWorkerExecutor.scheduleWithFixedDelay(
-        new AutoCommitWorker(),
-        generateRandomInitialDelayMs(autoCommitIntervalMs),
-        autoCommitIntervalMs,
-        TimeUnit.MILLISECONDS);
+              new AutoCommitWorker().run();
+            },
+            autoCommitIntervalMs);
+    LOGGER.info("SubscriptionPullConsumer {} submit auto commit worker", this);
   }
 
-  private void shutdownAutoCommitWorker() {
-    autoCommitWorkerExecutor.shutdown();
-    autoCommitWorkerExecutor = null;
+  private class AutoCommitWorker implements Runnable {
+    @Override
+    public void run() {
+      if (isClosed()) {
+        return;
+      }
+
+      final long currentTimestamp = System.currentTimeMillis();
+      long index = currentTimestamp / autoCommitIntervalMs;
+      if (currentTimestamp % autoCommitIntervalMs == 0) {
+        index -= 1;
+      }
+
+      for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
+          uncommittedMessages.headMap(index).entrySet()) {
+        try {
+          ack(entry.getValue());
+          uncommittedMessages.remove(entry.getKey());
+        } catch (final Exception e) {
+          LOGGER.warn("something unexpected happened when auto commit 
messages...", e);
+        }
+      }
+    }
   }
 
   private void commitAllUncommittedMessages() {
@@ -239,56 +258,72 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
     private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE;
     private long autoCommitIntervalMs = 
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE;
 
+    @Override
     public Builder host(final String host) {
       super.host(host);
       return this;
     }
 
+    @Override
     public Builder port(final int port) {
       super.port(port);
       return this;
     }
 
+    @Override
     public Builder nodeUrls(final List<String> nodeUrls) {
       super.nodeUrls(nodeUrls);
       return this;
     }
 
+    @Override
     public Builder username(final String username) {
       super.username(username);
       return this;
     }
 
+    @Override
     public Builder password(final String password) {
       super.password(password);
       return this;
     }
 
+    @Override
     public Builder consumerId(final String consumerId) {
       super.consumerId(consumerId);
       return this;
     }
 
+    @Override
     public Builder consumerGroupId(final String consumerGroupId) {
       super.consumerGroupId(consumerGroupId);
       return this;
     }
 
+    @Override
     public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
       super.heartbeatIntervalMs(heartbeatIntervalMs);
       return this;
     }
 
+    @Override
     public Builder endpointsSyncIntervalMs(final long endpointsSyncIntervalMs) 
{
       super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
       return this;
     }
 
+    @Override
     public Builder fileSaveDir(final String fileSaveDir) {
       super.fileSaveDir(fileSaveDir);
       return this;
     }
 
+    @Override
+    public Builder fileSaveFsync(final boolean fileSaveFsync) {
+      super.fileSaveFsync(fileSaveFsync);
+      return this;
+    }
+
     public Builder autoCommit(final boolean autoCommit) {
       this.autoCommit = autoCommit;
       return this;
@@ -311,31 +346,4 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
           "SubscriptionPullConsumer.Builder do not support build push 
consumer.");
     }
   }
-
-  /////////////////////////////// auto commit worker 
///////////////////////////////
-
-  class AutoCommitWorker implements Runnable {
-    @Override
-    public void run() {
-      if (isClosed()) {
-        return;
-      }
-
-      final long currentTimestamp = System.currentTimeMillis();
-      long index = currentTimestamp / autoCommitIntervalMs;
-      if (currentTimestamp % autoCommitIntervalMs == 0) {
-        index -= 1;
-      }
-
-      for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
-          uncommittedMessages.headMap(index).entrySet()) {
-        try {
-          ack(entry.getValue());
-          uncommittedMessages.remove(entry.getKey());
-        } catch (final Exception e) {
-          LOGGER.warn("something unexpected happened when auto commit 
messages...", e);
-        }
-      }
-    }
-  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
similarity index 57%
rename from 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
rename to 
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 9dbb2df59a5..7ff1dd39b15 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.session.subscription;
+package org.apache.iotdb.session.subscription.consumer;
 
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -29,12 +29,21 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+/**
+ * The {@link SubscriptionPushConsumer} corresponds to the push consumption 
mode in the message
+ * queue.
+ *
+ * <p>User code is triggered by newly arrived data events and only needs to 
pre-configure message
+ * acknowledgment strategy ({@link #ackStrategy}) and consumption handling 
logic ({@link
+ * #consumeListener}).
+ *
+ * <p>User code does not need to manually commit the consumption progress.
+ */
 public class SubscriptionPushConsumer extends SubscriptionConsumer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionPushConsumer.class);
@@ -42,7 +51,8 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
   private final AckStrategy ackStrategy;
   private final ConsumeListener consumeListener;
 
-  private ScheduledExecutorService autoPollWorkerExecutor;
+  private final long autoPollIntervalMs;
+  private final long autoPollTimeoutMs;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(true);
 
@@ -51,6 +61,9 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
 
     this.ackStrategy = builder.ackStrategy;
     this.consumeListener = builder.consumeListener;
+
+    this.autoPollIntervalMs = builder.autoPollIntervalMs;
+    this.autoPollTimeoutMs = builder.autoPollTimeoutMs;
   }
 
   public SubscriptionPushConsumer(final Properties config) {
@@ -61,17 +74,38 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
         (ConsumeListener)
             config.getOrDefault(
                 ConsumerConstant.CONSUME_LISTENER_KEY,
-                (ConsumeListener) message -> ConsumeResult.SUCCESS));
+                (ConsumeListener) message -> ConsumeResult.SUCCESS),
+        (Long)
+            config.getOrDefault(
+                ConsumerConstant.AUTO_POLL_INTERVAL_MS_KEY,
+                ConsumerConstant.AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE),
+        (Long)
+            config.getOrDefault(
+                ConsumerConstant.AUTO_POLL_TIMEOUT_MS_KEY,
+                ConsumerConstant.AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE));
   }
 
   private SubscriptionPushConsumer(
       final Properties config,
       final AckStrategy ackStrategy,
-      final ConsumeListener consumeListener) {
-    super(new Builder().ackStrategy(ackStrategy), config);
+      final ConsumeListener consumeListener,
+      final long autoPollIntervalMs,
+      final long autoPollTimeoutMs) {
+    super(
+        new Builder()
+            .ackStrategy(ackStrategy)
+            .consumeListener(consumeListener)
+            .autoPollIntervalMs(autoPollIntervalMs)
+            .autoPollTimeoutMs(autoPollTimeoutMs),
+        config);
 
     this.ackStrategy = ackStrategy;
     this.consumeListener = consumeListener;
+
+    this.autoPollIntervalMs =
+        Math.max(autoPollIntervalMs, 
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+    this.autoPollTimeoutMs =
+        Math.max(autoPollTimeoutMs, 
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
   }
 
   /////////////////////////////// open & close ///////////////////////////////
@@ -82,9 +116,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     }
 
     super.open();
-
-    launchAutoPollWorker();
-
+    submitAutoPollWorker();
     isClosed.set(false);
   }
 
@@ -94,12 +126,8 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
       return;
     }
 
-    try {
-      shutdownAutoPollWorker();
-      super.close();
-    } finally {
-      isClosed.set(true);
-    }
+    super.close();
+    isClosed.set(true);
   }
 
   @Override
@@ -109,31 +137,66 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
 
   /////////////////////////////// auto poll ///////////////////////////////
 
-  @SuppressWarnings("unsafeThreadSchedule")
-  private void launchAutoPollWorker() {
-    autoPollWorkerExecutor =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> {
-              final Thread t =
-                  new Thread(Thread.currentThread().getThreadGroup(), r, 
"PushConsumerWorker", 0);
-              if (!t.isDaemon()) {
-                t.setDaemon(true);
+  private void submitAutoPollWorker() {
+    final ScheduledFuture<?>[] future = new ScheduledFuture<?>[1];
+    future[0] =
+        SubscriptionExecutorServiceManager.submitAutoPollWorker(
+            () -> {
+              if (isClosed()) {
+                if (Objects.nonNull(future[0])) {
+                  future[0].cancel(false);
+                  LOGGER.info("SubscriptionPushConsumer {} cancel auto poll 
worker", this);
+                }
+                return;
               }
-              if (t.getPriority() != Thread.NORM_PRIORITY) {
-                t.setPriority(Thread.NORM_PRIORITY);
-              }
-              return t;
-            });
-    autoPollWorkerExecutor.scheduleWithFixedDelay(
-        new AutoPollWorker(),
-        
generateRandomInitialDelayMs(ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS),
-        ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_INTERVAL_MS,
-        TimeUnit.MILLISECONDS);
+              new AutoPollWorker().run();
+            },
+            autoPollIntervalMs);
+    LOGGER.info("SubscriptionPushConsumer {} submit auto poll worker", this);
   }
 
-  private void shutdownAutoPollWorker() {
-    autoPollWorkerExecutor.shutdown();
-    autoPollWorkerExecutor = null;
+  class AutoPollWorker implements Runnable {
+    @Override
+    public void run() {
+      if (isClosed()) {
+        return;
+      }
+
+      try {
+        // Poll all subscribed topics by passing an empty set
+        final List<SubscriptionMessage> messages = 
poll(Collections.emptySet(), autoPollTimeoutMs);
+
+        if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
+          ack(messages);
+        }
+
+        final List<SubscriptionMessage> messagesToAck = new ArrayList<>();
+        final List<SubscriptionMessage> messagesToNack = new ArrayList<>();
+        for (final SubscriptionMessage message : messages) {
+          final ConsumeResult consumeResult;
+          try {
+            consumeResult = consumeListener.onReceive(message);
+            if (consumeResult.equals(ConsumeResult.SUCCESS)) {
+              messagesToAck.add(message);
+            } else {
+              LOGGER.warn("Consumer listener result failure when consuming 
message: {}", message);
+              messagesToNack.add(message);
+            }
+          } catch (final Exception e) {
+            LOGGER.warn(
+                "Consumer listener raised an exception while consuming 
message: {}", message, e);
+            messagesToNack.add(message);
+          }
+        }
+
+        if (ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
+          ack(messagesToAck);
+          nack(messagesToNack);
+        }
+      } catch (final Exception e) {
+        LOGGER.warn("something unexpected happened when auto poll 
messages...", e);
+      }
+    }
   }
 
   /////////////////////////////// builder ///////////////////////////////
@@ -143,62 +206,97 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     private AckStrategy ackStrategy = AckStrategy.defaultValue();
     private ConsumeListener consumeListener = message -> ConsumeResult.SUCCESS;
 
-    public SubscriptionPushConsumer.Builder host(final String host) {
+    private long autoPollIntervalMs = 
ConsumerConstant.AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE;
+    private long autoPollTimeoutMs = 
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE;
+
+    @Override
+    public Builder host(final String host) {
       super.host(host);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder port(final int port) {
+    @Override
+    public Builder port(final int port) {
       super.port(port);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder username(final String username) {
+    @Override
+    public Builder nodeUrls(final List<String> nodeUrls) {
+      super.nodeUrls(nodeUrls);
+      return this;
+    }
+
+    @Override
+    public Builder username(final String username) {
       super.username(username);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder password(final String password) {
+    @Override
+    public Builder password(final String password) {
       super.password(password);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder consumerId(final String 
consumerId) {
+    @Override
+    public Builder consumerId(final String consumerId) {
       super.consumerId(consumerId);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder consumerGroupId(final String 
consumerGroupId) {
+    @Override
+    public Builder consumerGroupId(final String consumerGroupId) {
       super.consumerGroupId(consumerGroupId);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
+    @Override
+    public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
       super.heartbeatIntervalMs(heartbeatIntervalMs);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder endpointsSyncIntervalMs(
-        final long endpointsSyncIntervalMs) {
+    @Override
+    public Builder endpointsSyncIntervalMs(final long endpointsSyncIntervalMs) 
{
       super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder fileSaveDir(final String 
fileSaveDir) {
-      this.fileSaveDir = fileSaveDir;
+    @Override
+    public Builder fileSaveDir(final String fileSaveDir) {
+      super.fileSaveDir(fileSaveDir);
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder ackStrategy(final AckStrategy 
ackStrategy) {
+    @Override
+    public Builder fileSaveFsync(final boolean fileSaveFsync) {
+      super.fileSaveFsync(fileSaveFsync);
+      return this;
+    }
+
+    public Builder ackStrategy(final AckStrategy ackStrategy) {
       this.ackStrategy = ackStrategy;
       return this;
     }
 
-    public SubscriptionPushConsumer.Builder consumeListener(final 
ConsumeListener consumeListener) {
+    public Builder consumeListener(final ConsumeListener consumeListener) {
       this.consumeListener = consumeListener;
       return this;
     }
 
+    public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
+      this.autoPollIntervalMs =
+          Math.max(autoPollIntervalMs, 
ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
+      return this;
+    }
+
+    public Builder autoPollTimeoutMs(final long autoPollTimeoutMs) {
+      this.autoPollTimeoutMs =
+          Math.max(autoPollTimeoutMs, 
ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
+      return this;
+    }
+
     @Override
     public SubscriptionPullConsumer buildPullConsumer() {
       throw new SubscriptionException(
@@ -210,51 +308,4 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
       return new SubscriptionPushConsumer(this);
     }
   }
-
-  /////////////////////////////// auto poll worker 
///////////////////////////////
-
-  class AutoPollWorker implements Runnable {
-    @Override
-    public void run() {
-      if (isClosed()) {
-        return;
-      }
-
-      try {
-        // Poll all subscribed topics by passing an empty set
-        final List<SubscriptionMessage> messages =
-            poll(Collections.emptySet(), 
ConsumerConstant.PUSH_CONSUMER_AUTO_POLL_TIME_OUT_MS);
-
-        if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
-          ack(messages);
-        }
-
-        final List<SubscriptionMessage> messagesToAck = new ArrayList<>();
-        final List<SubscriptionMessage> messagesToNack = new ArrayList<>();
-        for (final SubscriptionMessage message : messages) {
-          final ConsumeResult consumeResult;
-          try {
-            consumeResult = consumeListener.onReceive(message);
-            if (consumeResult.equals(ConsumeResult.SUCCESS)) {
-              messagesToAck.add(message);
-            } else {
-              LOGGER.warn("Consumer listener result failure when consuming 
message: {}", message);
-              messagesToNack.add(message);
-            }
-          } catch (final Exception e) {
-            LOGGER.warn(
-                "Consumer listener raised an exception while consuming 
message: {}", message, e);
-            messagesToNack.add(message);
-          }
-        }
-
-        if (ackStrategy.equals(AckStrategy.AFTER_CONSUME)) {
-          ack(messagesToAck);
-          nack(messagesToNack);
-        }
-      } catch (final Exception e) {
-        LOGGER.warn("something unexpected happened when auto poll 
messages...", e);
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
index c5a12942718..4a592f8a86f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEventBinaryCache.java
@@ -93,7 +93,7 @@ public class SubscriptionEventBinaryCache {
 
   private SubscriptionEventBinaryCache() {
     final long initMemorySizeInBytes =
-        PipeResourceManager.memory().getTotalMemorySizeInBytes() / 10;
+        PipeResourceManager.memory().getTotalMemorySizeInBytes() / 20;
     final long maxMemorySizeInBytes =
         (long)
             (PipeResourceManager.memory().getTotalMemorySizeInBytes()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 59de17e371e..b78f7dcb6fb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -241,7 +241,7 @@ public class CommonConfig {
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
   private long twoStageAggregateSenderEndPointsCacheInMs = 3 * 60 * 1000L; // 
3 minutes
 
-  private float subscriptionCacheMemoryUsagePercentage = 0.1F;
+  private float subscriptionCacheMemoryUsagePercentage = 0.2F;
 
   private int subscriptionSubtaskExecutorMaxThreadNum =
       Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index 210ef5edbdb..c6556076698 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -158,7 +158,7 @@ public class ConsumerGroupMetaKeeper {
   @Override
   public String toString() {
     return "ConsumerGroupMetaKeeper{"
-        + "consumerGroupIDToConsumerGroupMetaMap="
+        + "consumerGroupIdToConsumerGroupMetaMap="
         + consumerGroupIdToConsumerGroupMetaMap
         + '}';
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index 5f3c4fc1803..f1bb9b46085 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -128,7 +128,7 @@ public class ConsumerMeta {
   @Override
   public String toString() {
     return "ConsumerMeta{"
-        + "consumerID='"
+        + "consumerId='"
         + consumerId
         + "', creationTime="
         + creationTime

Reply via email to