This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b20ab8accf2 Subscription: implemented runtime permission check (#15376)
b20ab8accf2 is described below

commit b20ab8accf2c3ee652445f43170c62a5d17ca936
Author: VGalaxies <[email protected]>
AuthorDate: Tue Apr 22 15:34:14 2025 +0800

    Subscription: implemented runtime permission check (#15376)
---
 .../it/local/IoTDBSubscriptionPermissionIT.java    | 236 +++++++++++++++++++++
 .../rpc/subscription/config/ConsumerConfig.java    |   8 +
 .../response/PipeSubscribeHeartbeatResp.java       |  48 ++++-
 .../SubscriptionSessionConnection.java             |  32 ---
 .../base/AbstractSubscriptionConsumer.java         |   2 +-
 .../base/AbstractSubscriptionProvider.java         |  13 +-
 .../base/AbstractSubscriptionProviders.java        |   6 +-
 .../iotdb/confignode/manager/ProcedureManager.java |   6 +-
 .../subscription/CreateSubscriptionProcedure.java  |  21 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   4 +-
 .../receiver/SubscriptionReceiverV1.java           |  47 +++-
 .../meta/consumer/ConsumerGroupMeta.java           |  21 ++
 .../subscription/meta/consumer/ConsumerMeta.java   |  12 ++
 .../commons/subscription/meta/topic/TopicMeta.java |   9 +-
 14 files changed, 404 insertions(+), 61 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
new file mode 100644
index 00000000000..061366058d9
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
+import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
+import org.apache.iotdb.session.subscription.model.Subscription;
+import org.apache.iotdb.session.subscription.model.Topic;
+import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionPermissionIT extends AbstractSubscriptionLocalIT 
{
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionPermissionIT.class);
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Test
+  public void testMetaAccessControl() {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    final String username = "thulab";
+    final String password = "passwd";
+
+    // create user
+    createUser(EnvFactory.getEnv(), username, password);
+
+    // root user
+    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+      session.open();
+      // create topic
+      final String topicName = "topic_root";
+      session.createTopic(topicName);
+      Assert.assertTrue(session.getTopic(topicName).isPresent());
+      Assert.assertEquals(topicName, 
session.getTopic(topicName).get().getTopicName());
+      // show topic
+      final Optional<Topic> topic = session.getTopic(topicName);
+      Assert.assertTrue(topic.isPresent());
+      Assert.assertEquals(topicName, topic.get().getTopicName());
+      // drop topic
+      session.dropTopic(topicName);
+      // show subscription
+      final Set<Subscription> subscriptions = 
session.getSubscriptions(topicName);
+      Assert.assertTrue(subscriptions.isEmpty());
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // normal user
+    try (final SubscriptionTreeSession session =
+        new SubscriptionTreeSession(
+            host, port, username, password, 
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+      session.open();
+      // create topic
+      String topicName = "topic_thulab";
+      session.createTopic(topicName);
+      fail();
+    } catch (final Exception e) {
+
+    }
+
+    // normal user
+    try (final SubscriptionTreeSession session =
+        new SubscriptionTreeSession(
+            host, port, username, password, 
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+      session.open();
+      // show topics
+      session.getTopics();
+      fail();
+    } catch (final Exception e) {
+
+    }
+
+    // normal user
+    try (final SubscriptionTreeSession session =
+        new SubscriptionTreeSession(
+            host, port, username, password, 
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+      session.open();
+      // show subscriptions
+      session.getSubscriptions();
+      fail();
+    } catch (final Exception e) {
+
+    }
+  }
+
+  @Test
+  public void testRuntimeAccessControl() {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    final String topicName = "topic1";
+
+    // create user
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        EnvFactory.getEnv(),
+        Arrays.asList("create user `thulab` 'passwd'", "create user `hacker` 
'qwerty123'"))) {
+      return;
+    }
+
+    // root user
+    try (final SubscriptionTreeSession session = new 
SubscriptionTreeSession(host, port)) {
+      session.open();
+      // create topic
+      session.createTopic(topicName);
+      Assert.assertTrue(session.getTopic(topicName).isPresent());
+      Assert.assertEquals(topicName, 
session.getTopic(topicName).get().getTopicName());
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    final AtomicInteger rowCount = new AtomicInteger();
+    try (final SubscriptionTreePushConsumer consumer1 =
+            new SubscriptionTreePushConsumer.Builder()
+                .host(host)
+                .port(port)
+                .username("thulab")
+                .password("passwd")
+                .consumerId("thulab_consumer_1")
+                .consumerGroupId("thulab_consumer_group")
+                .ackStrategy(AckStrategy.AFTER_CONSUME)
+                .consumeListener(
+                    message -> {
+                      for (final SubscriptionSessionDataSet dataSet :
+                          message.getSessionDataSetsHandler()) {
+                        while (dataSet.hasNext()) {
+                          dataSet.next();
+                          rowCount.addAndGet(1);
+                        }
+                      }
+                      return ConsumeResult.SUCCESS;
+                    })
+                .buildPushConsumer();
+        final SubscriptionTreePushConsumer consumer2 =
+            new SubscriptionTreePushConsumer.Builder()
+                .host(host)
+                .port(port)
+                .username("thulab")
+                .password("passwd")
+                .consumerId("thulab_consumer_2")
+                .consumerGroupId("thulab_consumer_group")
+                .ackStrategy(AckStrategy.AFTER_CONSUME)
+                .consumeListener(
+                    message -> {
+                      for (final SubscriptionSessionDataSet dataSet :
+                          message.getSessionDataSetsHandler()) {
+                        while (dataSet.hasNext()) {
+                          dataSet.next();
+                          rowCount.addAndGet(1);
+                        }
+                      }
+                      return ConsumeResult.SUCCESS;
+                    })
+                .buildPushConsumer();
+        final SubscriptionTreePushConsumer consumer3 =
+            new SubscriptionTreePushConsumer.Builder()
+                .host(host)
+                .port(port)
+                .username("hacker")
+                .password("qwerty123")
+                .consumerId("hacker_consumer")
+                .consumerGroupId("thulab_consumer_group")
+                .ackStrategy(AckStrategy.AFTER_CONSUME)
+                .consumeListener(
+                    message -> {
+                      for (final SubscriptionSessionDataSet dataSet :
+                          message.getSessionDataSetsHandler()) {
+                        while (dataSet.hasNext()) {
+                          dataSet.next();
+                          rowCount.addAndGet(1);
+                        }
+                      }
+                      return ConsumeResult.SUCCESS;
+                    })
+                .buildPushConsumer()) {
+
+      consumer1.open();
+      consumer1.subscribe(topicName);
+
+      consumer2.open();
+      consumer2.subscribe(topicName);
+
+      consumer3.open();
+      consumer3.subscribe(topicName);
+      fail();
+    } catch (final Exception e) {
+    }
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 3bcb984732c..0ae2c5bbdc8 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,14 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
   }
 
+  public String getUsername() {
+    return getString(ConsumerConstant.USERNAME_KEY);
+  }
+
+  public String getPassword() {
+    return getString(ConsumerConstant.PASSWORD_KEY);
+  }
+
   public void setConsumerId(final String consumerId) {
     attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
   }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
index 34a927ef29c..62939f20b4f 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.rpc.subscription.payload.response;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
@@ -38,10 +39,16 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
 
   private transient Map<String, TopicConfig> topics = new HashMap<>(); // 
subscribed topics
 
+  private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); // 
available endpoints
+
   public Map<String, TopicConfig> getTopics() {
     return topics;
   }
 
+  public Map<Integer, TEndPoint> getEndPoints() {
+    return endPoints;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   /**
@@ -63,7 +70,10 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
    * server.
    */
   public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
-      final TSStatus status, final Map<String, TopicConfig> topics) throws 
IOException {
+      final TSStatus status,
+      final Map<String, TopicConfig> topics,
+      final Map<Integer, TEndPoint> endPoints)
+      throws IOException {
     final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);
 
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -73,6 +83,12 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
         entry.getValue().serialize(outputStream);
       }
+      ReadWriteIOUtils.write(endPoints.size(), outputStream);
+      for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
+      }
       resp.body =
           Collections.singletonList(
               ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
@@ -89,14 +105,27 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
     if (Objects.nonNull(heartbeatResp.body)) {
       for (final ByteBuffer byteBuffer : heartbeatResp.body) {
         if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
-          final int size = ReadWriteIOUtils.readInt(byteBuffer);
-          final Map<String, TopicConfig> topics = new HashMap<>();
-          for (int i = 0; i < size; i++) {
-            final String topicName = ReadWriteIOUtils.readString(byteBuffer);
-            final TopicConfig topicConfig = 
TopicConfig.deserialize(byteBuffer);
-            topics.put(topicName, topicConfig);
+          {
+            final int size = ReadWriteIOUtils.readInt(byteBuffer);
+            final Map<String, TopicConfig> topics = new HashMap<>();
+            for (int i = 0; i < size; i++) {
+              final String topicName = ReadWriteIOUtils.readString(byteBuffer);
+              final TopicConfig topicConfig = 
TopicConfig.deserialize(byteBuffer);
+              topics.put(topicName, topicConfig);
+            }
+            resp.topics = topics;
+          }
+          {
+            final int size = ReadWriteIOUtils.readInt(byteBuffer);
+            final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+            for (int i = 0; i < size; i++) {
+              final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
+              final String ip = ReadWriteIOUtils.readString(byteBuffer);
+              final int port = ReadWriteIOUtils.readInt(byteBuffer);
+              endPoints.put(nodeId, new TEndPoint(ip, port));
+            }
+            resp.endPoints = endPoints;
           }
-          resp.topics = topics;
           break;
         }
       }
@@ -122,6 +151,7 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
     }
     final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
     return Objects.equals(this.topics, that.topics)
+        && Objects.equals(this.endPoints, that.endPoints)
         && Objects.equals(this.status, that.status)
         && this.version == that.version
         && this.type == that.type
@@ -130,6 +160,6 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
 
   @Override
   public int hashCode() {
-    return Objects.hash(topics, status, version, type, body);
+    return Objects.hash(topics, endPoints, status, version, type, body);
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index eeb1a54dbb7..3f15f4dcf6a 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -20,9 +20,7 @@
 package org.apache.iotdb.session.subscription;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 import org.apache.iotdb.session.Session;
@@ -31,20 +29,11 @@ import org.apache.iotdb.session.SessionConnection;
 import org.apache.thrift.TException;
 
 import java.time.ZoneId;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 
 public class SubscriptionSessionConnection extends SessionConnection {
 
-  private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
-  private static final String NODE_ID_COLUMN_NAME = "NodeID";
-  private static final String STATUS_COLUMN_NAME = "Status";
-  private static final String IP_COLUMN_NAME = "RpcAddress";
-  private static final String PORT_COLUMN_NAME = "RpcPort";
-  private static final String REMOVING_STATUS = "Removing";
-
   public SubscriptionSessionConnection(
       final Session session,
       final TEndPoint endPoint,
@@ -66,27 +55,6 @@ public class SubscriptionSessionConnection extends 
SessionConnection {
         database);
   }
 
-  // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
-  public Map<Integer, TEndPoint> fetchAllEndPoints()
-      throws IoTDBConnectionException, StatementExecutionException {
-    final SessionDataSet dataSet = 
session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
-    final SessionDataSet.DataIterator iterator = dataSet.iterator();
-    final Map<Integer, TEndPoint> endPoints = new HashMap<>();
-    while (iterator.next()) {
-      // ignore removing DN
-      if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
-        continue;
-      }
-      final String ip = iterator.getString(IP_COLUMN_NAME);
-      final String port = iterator.getString(PORT_COLUMN_NAME);
-      if (ip != null && port != null) {
-        endPoints.put(
-            iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip, 
Integer.parseInt(port)));
-      }
-    }
-    return endPoints;
-  }
-
   public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws 
TException {
     return client.pipeSubscribe(req);
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index bc5ad3d443e..83ff755ddfb 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -1379,7 +1379,7 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     }
     for (final AbstractSubscriptionProvider provider : providers) {
       try {
-        return provider.getSessionConnection().fetchAllEndPoints();
+        return provider.heartbeat().getEndPoints();
       } catch (final Exception e) {
         LOGGER.warn(
             "{} failed to fetch all endpoints from subscription provider {}, 
try next subscription provider...",
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 1a20f07b0cf..781dacb9738 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -88,6 +88,9 @@ public abstract class AbstractSubscriptionProvider {
   private final TEndPoint endPoint;
   private int dataNodeId;
 
+  private final String username;
+  private final String password;
+
   protected abstract AbstractSessionBuilder 
constructSubscriptionSessionBuilder(
       final String host,
       final int port,
@@ -109,6 +112,8 @@ public abstract class AbstractSubscriptionProvider {
     this.endPoint = endPoint;
     this.consumerId = consumerId;
     this.consumerGroupId = consumerGroupId;
+    this.username = username;
+    this.password = password;
   }
 
   SubscriptionSessionConnection getSessionConnection() {
@@ -156,6 +161,8 @@ public abstract class AbstractSubscriptionProvider {
     final Map<String, String> consumerAttributes = new HashMap<>();
     consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, 
consumerGroupId);
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+    consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
+    consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
 
     final PipeSubscribeHandshakeResp resp =
         handshake(new ConsumerConfig(consumerAttributes)); // throw 
SubscriptionException
@@ -229,7 +236,7 @@ public abstract class AbstractSubscriptionProvider {
 
   /////////////////////////////// subscription APIs 
///////////////////////////////
 
-  Map<String, TopicConfig> heartbeat() throws SubscriptionException {
+  PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
     final TPipeSubscribeResp resp;
     try {
       resp = 
getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
@@ -243,9 +250,7 @@ public abstract class AbstractSubscriptionProvider {
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
-    final PipeSubscribeHeartbeatResp heartbeatResp =
-        PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
-    return heartbeatResp.getTopics();
+    return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
   }
 
   Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws 
SubscriptionException {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
index 142719df8b1..fe765dab2da 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
@@ -94,7 +94,7 @@ final class AbstractSubscriptionProviders {
 
       final Map<Integer, TEndPoint> allEndPoints;
       try {
-        allEndPoints = 
defaultProvider.getSessionConnection().fetchAllEndPoints();
+        allEndPoints = defaultProvider.heartbeat().getEndPoints();
       } catch (final Exception e) {
         LOGGER.warn(
             "{} failed to fetch all endpoints from {} because of {}", 
consumer, endPoint, e, e);
@@ -244,7 +244,7 @@ final class AbstractSubscriptionProviders {
   private void heartbeatInternal(final AbstractSubscriptionConsumer consumer) {
     for (final AbstractSubscriptionProvider provider : getAllProviders()) {
       try {
-        consumer.subscribedTopics = provider.heartbeat();
+        consumer.subscribedTopics = provider.heartbeat().getTopics();
         provider.setAvailable();
       } catch (final Exception e) {
         LOGGER.warn(
@@ -309,7 +309,7 @@ final class AbstractSubscriptionProviders {
       } else {
         // existing provider
         try {
-          consumer.subscribedTopics = provider.heartbeat();
+          consumer.subscribedTopics = provider.heartbeat().getTopics();
           provider.setAvailable();
         } catch (final Exception e) {
           LOGGER.warn(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6f321044e65..c6446490a04 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1662,7 +1662,8 @@ public class ProcedureManager {
         return new 
TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       } else {
-        return new 
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode());
+        return new 
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
+            
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
     } catch (Exception e) {
       return new 
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
@@ -1683,7 +1684,8 @@ public class ProcedureManager {
         return new 
TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode())
             
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       } else {
-        return new 
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode());
+        return new 
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
+            
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
       }
     } catch (Exception e) {
       return new 
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 4a48ebdd35d..31b852b38bb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
+import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -88,8 +89,13 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
 
     subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
 
-    // Construct AlterConsumerGroupProcedure
+    final String consumerId = subscribeReq.getConsumerId();
     final String consumerGroupId = subscribeReq.getConsumerGroupId();
+    final ConsumerGroupMeta consumerGroupMeta =
+        subscriptionInfo.get().getConsumerGroupMeta(consumerGroupId);
+    final ConsumerMeta consumerMeta = 
consumerGroupMeta.getConsumerMeta(consumerId);
+
+    // Construct AlterConsumerGroupProcedure
     final ConsumerGroupMeta updatedConsumerGroupMeta =
         subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId);
     updatedConsumerGroupMeta.addSubscription(
@@ -101,6 +107,16 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
     for (final String topicName : subscribeReq.getTopicNames()) {
       final String pipeName =
           PipeStaticMeta.generateSubscriptionPipeName(topicName, 
consumerGroupId);
+      // check username
+      if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName, 
consumerId)) {
+        final String exceptionMessage =
+            String.format(
+                "Failed to subscribe topic %s for consumer %s because 
inconsistent username under the same consumer group",
+                topicName, consumerId);
+        LOGGER.warn(exceptionMessage);
+        throw new SubscriptionException(exceptionMessage);
+      }
+
       if (!subscriptionInfo.get().isTopicSubscribedByConsumerGroup(topicName, 
consumerGroupId)
           // even if there existed subscription meta, if there is no 
corresponding pipe meta, it
           // will try to create the pipe
@@ -110,7 +126,8 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
             new CreatePipeProcedureV2(
                 new TCreatePipeReq()
                     .setPipeName(pipeName)
-                    
.setExtractorAttributes(topicMeta.generateExtractorAttributes())
+                    .setExtractorAttributes(
+                        
topicMeta.generateExtractorAttributes(consumerMeta.getUsername()))
                     
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
                     
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
                 pipeTaskInfo));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 8b0966a66eb..892a74a4319 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2414,7 +2414,9 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
       PipeDataNodeAgent.plugin()
           .validate(
               "fakePipeName",
-              temporaryTopicMeta.generateExtractorAttributes(),
+              // TODO: currently use root to create topic
+              temporaryTopicMeta.generateExtractorAttributes(
+                  CommonDescriptor.getInstance().getConfig().getAdminName()),
               temporaryTopicMeta.generateProcessorAttributes(),
               
temporaryTopicMeta.generateConnectorAttributes("fakeConsumerGroupId"));
     } catch (final Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 87fde40a432..fe9e1345a17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -19,13 +19,17 @@
 
 package org.apache.iotdb.db.subscription.receiver;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +43,7 @@ import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetri
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
@@ -78,7 +83,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -260,13 +267,47 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     // TODO: do something
 
     LOGGER.info("Subscription: consumer {} heartbeat successfully", 
consumerConfig);
-    return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
-        RpcUtils.SUCCESS_STATUS,
+
+    // fetch subscribed topics
+    final Map<String, TopicConfig> topics =
         SubscriptionAgent.topic()
             .getTopicConfigs(
                 SubscriptionAgent.consumer()
                     .getTopicNamesSubscribedByConsumer(
-                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId())));
+                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
+
+    // fetch available endpoints
+    final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TShowDataNodesResp resp = configNodeClient.showDataNodes();
+      // refer to org.apache.iotdb.session.NodesSupplier.updateDataNodeList
+
+      for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+        // ignore removing DN
+        if (Objects.equals(NodeStatus.Removing.getStatus(), 
dataNodeInfo.getStatus())) {
+          continue;
+        }
+        final String ip = dataNodeInfo.getRpcAddresss();
+        final int port = dataNodeInfo.getRpcPort();
+        if (ip != null && port != 0) {
+          endPoints.put(dataNodeInfo.getDataNodeId(), new TEndPoint(ip, port));
+        }
+      }
+    } catch (final ClientManagerException | TException e) {
+      LOGGER.warn(
+          "Exception occurred when fetch endpoints for consumer {} in config 
node",
+          consumerConfig,
+          e);
+      final String exceptionMessage =
+          String.format(
+              "Subscription: Failed to fetch endpoints for consumer %s in 
config node, exception is %s.",
+              consumerConfig, e);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
+        RpcUtils.SUCCESS_STATUS, topics, endPoints);
   }
 
   private TPipeSubscribeResp handlePipeSubscribeSubscribe(final 
PipeSubscribeSubscribeReq req) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f7d4901884c..82871269220 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -127,6 +127,10 @@ public class ConsumerGroupMeta {
     return consumerIdToConsumerMeta.isEmpty();
   }
 
+  public ConsumerMeta getConsumerMeta(final String consumerId) {
+    return consumerIdToConsumerMeta.get(consumerId);
+  }
+
   ////////////////////////// subscription //////////////////////////
 
   /**
@@ -159,6 +163,23 @@ public class ConsumerGroupMeta {
     return !subscribedConsumerIdSet.isEmpty();
   }
 
+  public boolean allowSubscribeTopicForConsumer(final String topic, final 
String consumerId) {
+    if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
+      return false;
+    }
+    final Set<String> subscribedConsumerIdSet = 
topicNameToSubscribedConsumerIdSet.get(topic);
+    if (Objects.isNull(subscribedConsumerIdSet)) {
+      return true;
+    }
+    if (subscribedConsumerIdSet.isEmpty()) {
+      return true;
+    }
+    final String subscribedConsumerId = 
subscribedConsumerIdSet.iterator().next();
+    return Objects.equals(
+        
Objects.requireNonNull(consumerIdToConsumerMeta.get(subscribedConsumerId)).getUsername(),
+        
Objects.requireNonNull(consumerIdToConsumerMeta.get(consumerId)).getUsername());
+  }
+
   public void addSubscription(final String consumerId, final Set<String> 
topics) {
     if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
       throw new SubscriptionException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index f1bb9b46085..152f0b111df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -54,6 +54,18 @@ public class ConsumerMeta {
     return consumerId;
   }
 
+  public String getConsumerGroupId() {
+    return config.getConsumerGroupId();
+  }
+
+  public String getUsername() {
+    return config.getUsername();
+  }
+
+  public String getPassword() {
+    return config.getPassword();
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 86d9d0e49e7..3a092d9f80f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.subscription.meta.topic;
 
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
@@ -181,14 +180,16 @@ public class TopicMeta {
 
   /////////////////////////////// utilities ///////////////////////////////
 
-  public Map<String, String> generateExtractorAttributes() {
+  public Map<String, String> generateExtractorAttributes(final String 
username) {
     final Map<String, String> extractorAttributes = new HashMap<>();
     // disable meta sync
     extractorAttributes.put("source", "iotdb-source");
     extractorAttributes.put("inclusion", "data.insert");
     extractorAttributes.put("inclusion.exclusion", "data.delete");
-    // Currently use root in subscription pipes
-    extractorAttributes.put("username", 
CommonDescriptor.getInstance().getConfig().getAdminName());
+    // user
+    extractorAttributes.put("username", username);
+    // TODO: currently set skipif to no-privileges
+    extractorAttributes.put("skipif", "no-privileges");
     // sql dialect
     extractorAttributes.putAll(config.getAttributeWithSqlDialect());
     if (config.isTableTopic()) {


Reply via email to