This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b20ab8accf2 Subscription: implemented runtime permission check (#15376)
b20ab8accf2 is described below
commit b20ab8accf2c3ee652445f43170c62a5d17ca936
Author: VGalaxies <[email protected]>
AuthorDate: Tue Apr 22 15:34:14 2025 +0800
Subscription: implemented runtime permission check (#15376)
---
.../it/local/IoTDBSubscriptionPermissionIT.java | 236 +++++++++++++++++++++
.../rpc/subscription/config/ConsumerConfig.java | 8 +
.../response/PipeSubscribeHeartbeatResp.java | 48 ++++-
.../SubscriptionSessionConnection.java | 32 ---
.../base/AbstractSubscriptionConsumer.java | 2 +-
.../base/AbstractSubscriptionProvider.java | 13 +-
.../base/AbstractSubscriptionProviders.java | 6 +-
.../iotdb/confignode/manager/ProcedureManager.java | 6 +-
.../subscription/CreateSubscriptionProcedure.java | 21 +-
.../config/executor/ClusterConfigTaskExecutor.java | 4 +-
.../receiver/SubscriptionReceiverV1.java | 47 +++-
.../meta/consumer/ConsumerGroupMeta.java | 21 ++
.../subscription/meta/consumer/ConsumerMeta.java | 12 ++
.../commons/subscription/meta/topic/TopicMeta.java | 9 +-
14 files changed, 404 insertions(+), 61 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
new file mode 100644
index 00000000000..061366058d9
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionPermissionIT.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
+import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
+import org.apache.iotdb.session.subscription.model.Subscription;
+import org.apache.iotdb.session.subscription.model.Topic;
+import
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBSubscriptionPermissionIT extends AbstractSubscriptionLocalIT
{
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionPermissionIT.class);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Test
+ public void testMetaAccessControl() {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ final String username = "thulab";
+ final String password = "passwd";
+
+ // create user
+ createUser(EnvFactory.getEnv(), username, password);
+
+ // root user
+ try (final SubscriptionTreeSession session = new
SubscriptionTreeSession(host, port)) {
+ session.open();
+ // create topic
+ final String topicName = "topic_root";
+ session.createTopic(topicName);
+ Assert.assertTrue(session.getTopic(topicName).isPresent());
+ Assert.assertEquals(topicName,
session.getTopic(topicName).get().getTopicName());
+ // show topic
+ final Optional<Topic> topic = session.getTopic(topicName);
+ Assert.assertTrue(topic.isPresent());
+ Assert.assertEquals(topicName, topic.get().getTopicName());
+ // drop topic
+ session.dropTopic(topicName);
+ // show subscription
+ final Set<Subscription> subscriptions =
session.getSubscriptions(topicName);
+ Assert.assertTrue(subscriptions.isEmpty());
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // normal user
+ try (final SubscriptionTreeSession session =
+ new SubscriptionTreeSession(
+ host, port, username, password,
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+ session.open();
+ // create topic
+ String topicName = "topic_thulab";
+ session.createTopic(topicName);
+ fail();
+ } catch (final Exception e) {
+
+ }
+
+ // normal user
+ try (final SubscriptionTreeSession session =
+ new SubscriptionTreeSession(
+ host, port, username, password,
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+ session.open();
+ // show topics
+ session.getTopics();
+ fail();
+ } catch (final Exception e) {
+
+ }
+
+ // normal user
+ try (final SubscriptionTreeSession session =
+ new SubscriptionTreeSession(
+ host, port, username, password,
SessionConfig.DEFAULT_MAX_FRAME_SIZE)) {
+ session.open();
+ // show subscriptions
+ session.getSubscriptions();
+ fail();
+ } catch (final Exception e) {
+
+ }
+ }
+
+ @Test
+ public void testRuntimeAccessControl() {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ final String topicName = "topic1";
+
+ // create user
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ EnvFactory.getEnv(),
+ Arrays.asList("create user `thulab` 'passwd'", "create user `hacker`
'qwerty123'"))) {
+ return;
+ }
+
+ // root user
+ try (final SubscriptionTreeSession session = new
SubscriptionTreeSession(host, port)) {
+ session.open();
+ // create topic
+ session.createTopic(topicName);
+ Assert.assertTrue(session.getTopic(topicName).isPresent());
+ Assert.assertEquals(topicName,
session.getTopic(topicName).get().getTopicName());
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ final AtomicInteger rowCount = new AtomicInteger();
+ try (final SubscriptionTreePushConsumer consumer1 =
+ new SubscriptionTreePushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .username("thulab")
+ .password("passwd")
+ .consumerId("thulab_consumer_1")
+ .consumerGroupId("thulab_consumer_group")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer();
+ final SubscriptionTreePushConsumer consumer2 =
+ new SubscriptionTreePushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .username("thulab")
+ .password("passwd")
+ .consumerId("thulab_consumer_2")
+ .consumerGroupId("thulab_consumer_group")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer();
+ final SubscriptionTreePushConsumer consumer3 =
+ new SubscriptionTreePushConsumer.Builder()
+ .host(host)
+ .port(port)
+ .username("hacker")
+ .password("qwerty123")
+ .consumerId("hacker_consumer")
+ .consumerGroupId("thulab_consumer_group")
+ .ackStrategy(AckStrategy.AFTER_CONSUME)
+ .consumeListener(
+ message -> {
+ for (final SubscriptionSessionDataSet dataSet :
+ message.getSessionDataSetsHandler()) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ })
+ .buildPushConsumer()) {
+
+ consumer1.open();
+ consumer1.subscribe(topicName);
+
+ consumer2.open();
+ consumer2.subscribe(topicName);
+
+ consumer3.open();
+ consumer3.subscribe(topicName);
+ fail();
+ } catch (final Exception e) {
+ }
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 3bcb984732c..0ae2c5bbdc8 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,14 @@ public class ConsumerConfig extends PipeParameters {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}
+ public String getUsername() {
+ return getString(ConsumerConstant.USERNAME_KEY);
+ }
+
+ public String getPassword() {
+ return getString(ConsumerConstant.PASSWORD_KEY);
+ }
+
public void setConsumerId(final String consumerId) {
attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
index 34a927ef29c..62939f20b4f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc.subscription.payload.response;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
@@ -38,10 +39,16 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
private transient Map<String, TopicConfig> topics = new HashMap<>(); //
subscribed topics
+ private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); //
available endpoints
+
public Map<String, TopicConfig> getTopics() {
return topics;
}
+ public Map<Integer, TEndPoint> getEndPoints() {
+ return endPoints;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
/**
@@ -63,7 +70,10 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
* server.
*/
public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
- final TSStatus status, final Map<String, TopicConfig> topics) throws
IOException {
+ final TSStatus status,
+ final Map<String, TopicConfig> topics,
+ final Map<Integer, TEndPoint> endPoints)
+ throws IOException {
final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -73,6 +83,12 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().serialize(outputStream);
}
+ ReadWriteIOUtils.write(endPoints.size(), outputStream);
+ for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
+ }
resp.body =
Collections.singletonList(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
@@ -89,14 +105,27 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
if (Objects.nonNull(heartbeatResp.body)) {
for (final ByteBuffer byteBuffer : heartbeatResp.body) {
if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
- final int size = ReadWriteIOUtils.readInt(byteBuffer);
- final Map<String, TopicConfig> topics = new HashMap<>();
- for (int i = 0; i < size; i++) {
- final String topicName = ReadWriteIOUtils.readString(byteBuffer);
- final TopicConfig topicConfig =
TopicConfig.deserialize(byteBuffer);
- topics.put(topicName, topicConfig);
+ {
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<String, TopicConfig> topics = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ final String topicName = ReadWriteIOUtils.readString(byteBuffer);
+ final TopicConfig topicConfig =
TopicConfig.deserialize(byteBuffer);
+ topics.put(topicName, topicConfig);
+ }
+ resp.topics = topics;
+ }
+ {
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
+ final String ip = ReadWriteIOUtils.readString(byteBuffer);
+ final int port = ReadWriteIOUtils.readInt(byteBuffer);
+ endPoints.put(nodeId, new TEndPoint(ip, port));
+ }
+ resp.endPoints = endPoints;
}
- resp.topics = topics;
break;
}
}
@@ -122,6 +151,7 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
}
final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
return Objects.equals(this.topics, that.topics)
+ && Objects.equals(this.endPoints, that.endPoints)
&& Objects.equals(this.status, that.status)
&& this.version == that.version
&& this.type == that.type
@@ -130,6 +160,6 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
@Override
public int hashCode() {
- return Objects.hash(topics, status, version, type, body);
+ return Objects.hash(topics, endPoints, status, version, type, body);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index eeb1a54dbb7..3f15f4dcf6a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.session.subscription;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.Session;
@@ -31,20 +29,11 @@ import org.apache.iotdb.session.SessionConnection;
import org.apache.thrift.TException;
import java.time.ZoneId;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.function.Supplier;
public class SubscriptionSessionConnection extends SessionConnection {
- private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
- private static final String NODE_ID_COLUMN_NAME = "NodeID";
- private static final String STATUS_COLUMN_NAME = "Status";
- private static final String IP_COLUMN_NAME = "RpcAddress";
- private static final String PORT_COLUMN_NAME = "RpcPort";
- private static final String REMOVING_STATUS = "Removing";
-
public SubscriptionSessionConnection(
final Session session,
final TEndPoint endPoint,
@@ -66,27 +55,6 @@ public class SubscriptionSessionConnection extends
SessionConnection {
database);
}
- // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
- public Map<Integer, TEndPoint> fetchAllEndPoints()
- throws IoTDBConnectionException, StatementExecutionException {
- final SessionDataSet dataSet =
session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
- final SessionDataSet.DataIterator iterator = dataSet.iterator();
- final Map<Integer, TEndPoint> endPoints = new HashMap<>();
- while (iterator.next()) {
- // ignore removing DN
- if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
- continue;
- }
- final String ip = iterator.getString(IP_COLUMN_NAME);
- final String port = iterator.getString(PORT_COLUMN_NAME);
- if (ip != null && port != null) {
- endPoints.put(
- iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip,
Integer.parseInt(port)));
- }
- }
- return endPoints;
- }
-
public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws
TException {
return client.pipeSubscribe(req);
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index bc5ad3d443e..83ff755ddfb 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -1379,7 +1379,7 @@ abstract class AbstractSubscriptionConsumer implements
AutoCloseable {
}
for (final AbstractSubscriptionProvider provider : providers) {
try {
- return provider.getSessionConnection().fetchAllEndPoints();
+ return provider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from subscription provider {},
try next subscription provider...",
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 1a20f07b0cf..781dacb9738 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -88,6 +88,9 @@ public abstract class AbstractSubscriptionProvider {
private final TEndPoint endPoint;
private int dataNodeId;
+ private final String username;
+ private final String password;
+
protected abstract AbstractSessionBuilder
constructSubscriptionSessionBuilder(
final String host,
final int port,
@@ -109,6 +112,8 @@ public abstract class AbstractSubscriptionProvider {
this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
+ this.username = username;
+ this.password = password;
}
SubscriptionSessionConnection getSessionConnection() {
@@ -156,6 +161,8 @@ public abstract class AbstractSubscriptionProvider {
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY,
consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+ consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
+ consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
final PipeSubscribeHandshakeResp resp =
handshake(new ConsumerConfig(consumerAttributes)); // throw
SubscriptionException
@@ -229,7 +236,7 @@ public abstract class AbstractSubscriptionProvider {
/////////////////////////////// subscription APIs
///////////////////////////////
- Map<String, TopicConfig> heartbeat() throws SubscriptionException {
+ PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
final TPipeSubscribeResp resp;
try {
resp =
getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
@@ -243,9 +250,7 @@ public abstract class AbstractSubscriptionProvider {
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
- final PipeSubscribeHeartbeatResp heartbeatResp =
- PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
- return heartbeatResp.getTopics();
+ return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
}
Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws
SubscriptionException {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
index 142719df8b1..fe765dab2da 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProviders.java
@@ -94,7 +94,7 @@ final class AbstractSubscriptionProviders {
final Map<Integer, TEndPoint> allEndPoints;
try {
- allEndPoints =
defaultProvider.getSessionConnection().fetchAllEndPoints();
+ allEndPoints = defaultProvider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from {} because of {}",
consumer, endPoint, e, e);
@@ -244,7 +244,7 @@ final class AbstractSubscriptionProviders {
private void heartbeatInternal(final AbstractSubscriptionConsumer consumer) {
for (final AbstractSubscriptionProvider provider : getAllProviders()) {
try {
- consumer.subscribedTopics = provider.heartbeat();
+ consumer.subscribedTopics = provider.heartbeat().getTopics();
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
@@ -309,7 +309,7 @@ final class AbstractSubscriptionProviders {
} else {
// existing provider
try {
- consumer.subscribedTopics = provider.heartbeat();
+ consumer.subscribedTopics = provider.heartbeat().getTopics();
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 6f321044e65..c6446490a04 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1662,7 +1662,8 @@ public class ProcedureManager {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
} else {
- return new
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode());
+ return new
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
+
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode())
@@ -1683,7 +1684,8 @@ public class ProcedureManager {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
} else {
- return new
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode());
+ return new
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
+
.setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
}
} catch (Exception e) {
return new
TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode())
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 4a48ebdd35d..31b852b38bb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
+import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -88,8 +89,13 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
- // Construct AlterConsumerGroupProcedure
+ final String consumerId = subscribeReq.getConsumerId();
final String consumerGroupId = subscribeReq.getConsumerGroupId();
+ final ConsumerGroupMeta consumerGroupMeta =
+ subscriptionInfo.get().getConsumerGroupMeta(consumerGroupId);
+ final ConsumerMeta consumerMeta =
consumerGroupMeta.getConsumerMeta(consumerId);
+
+ // Construct AlterConsumerGroupProcedure
final ConsumerGroupMeta updatedConsumerGroupMeta =
subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId);
updatedConsumerGroupMeta.addSubscription(
@@ -101,6 +107,16 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
for (final String topicName : subscribeReq.getTopicNames()) {
final String pipeName =
PipeStaticMeta.generateSubscriptionPipeName(topicName,
consumerGroupId);
+ // check username
+ if (!consumerGroupMeta.allowSubscribeTopicForConsumer(topicName,
consumerId)) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to subscribe topic %s for consumer %s because
inconsistent username under the same consumer group",
+ topicName, consumerId);
+ LOGGER.warn(exceptionMessage);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
if (!subscriptionInfo.get().isTopicSubscribedByConsumerGroup(topicName,
consumerGroupId)
// even if there existed subscription meta, if there is no
corresponding pipe meta, it
// will try to create the pipe
@@ -110,7 +126,8 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
new CreatePipeProcedureV2(
new TCreatePipeReq()
.setPipeName(pipeName)
-
.setExtractorAttributes(topicMeta.generateExtractorAttributes())
+ .setExtractorAttributes(
+
topicMeta.generateExtractorAttributes(consumerMeta.getUsername()))
.setProcessorAttributes(topicMeta.generateProcessorAttributes())
.setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)),
pipeTaskInfo));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 8b0966a66eb..892a74a4319 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2414,7 +2414,9 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
PipeDataNodeAgent.plugin()
.validate(
"fakePipeName",
- temporaryTopicMeta.generateExtractorAttributes(),
+ // TODO: currently use root to create topic
+ temporaryTopicMeta.generateExtractorAttributes(
+ CommonDescriptor.getInstance().getConfig().getAdminName()),
temporaryTopicMeta.generateProcessorAttributes(),
temporaryTopicMeta.generateConnectorAttributes("fakeConsumerGroupId"));
} catch (final Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 87fde40a432..fe9e1345a17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -19,13 +19,17 @@
package org.apache.iotdb.db.subscription.receiver;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +43,7 @@ import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetri
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
@@ -78,7 +83,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -260,13 +267,47 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
// TODO: do something
LOGGER.info("Subscription: consumer {} heartbeat successfully",
consumerConfig);
- return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
- RpcUtils.SUCCESS_STATUS,
+
+ // fetch subscribed topics
+ final Map<String, TopicConfig> topics =
SubscriptionAgent.topic()
.getTopicConfigs(
SubscriptionAgent.consumer()
.getTopicNamesSubscribedByConsumer(
- consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId())));
+ consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId()));
+
+ // fetch available endpoints
+ final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TShowDataNodesResp resp = configNodeClient.showDataNodes();
+ // refer to org.apache.iotdb.session.NodesSupplier.updateDataNodeList
+
+ for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+ // ignore removing DN
+ if (Objects.equals(NodeStatus.Removing.getStatus(),
dataNodeInfo.getStatus())) {
+ continue;
+ }
+ final String ip = dataNodeInfo.getRpcAddresss();
+ final int port = dataNodeInfo.getRpcPort();
+ if (ip != null && port != 0) {
+ endPoints.put(dataNodeInfo.getDataNodeId(), new TEndPoint(ip, port));
+ }
+ }
+ } catch (final ClientManagerException | TException e) {
+ LOGGER.warn(
+ "Exception occurred when fetch endpoints for consumer {} in config
node",
+ consumerConfig,
+ e);
+ final String exceptionMessage =
+ String.format(
+ "Subscription: Failed to fetch endpoints for consumer %s in
config node, exception is %s.",
+ consumerConfig, e);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS, topics, endPoints);
}
private TPipeSubscribeResp handlePipeSubscribeSubscribe(final
PipeSubscribeSubscribeReq req) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f7d4901884c..82871269220 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -127,6 +127,10 @@ public class ConsumerGroupMeta {
return consumerIdToConsumerMeta.isEmpty();
}
+ public ConsumerMeta getConsumerMeta(final String consumerId) {
+ return consumerIdToConsumerMeta.get(consumerId);
+ }
+
////////////////////////// subscription //////////////////////////
/**
@@ -159,6 +163,23 @@ public class ConsumerGroupMeta {
return !subscribedConsumerIdSet.isEmpty();
}
+ public boolean allowSubscribeTopicForConsumer(final String topic, final
String consumerId) {
+ if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
+ return false;
+ }
+ final Set<String> subscribedConsumerIdSet =
topicNameToSubscribedConsumerIdSet.get(topic);
+ if (Objects.isNull(subscribedConsumerIdSet)) {
+ return true;
+ }
+ if (subscribedConsumerIdSet.isEmpty()) {
+ return true;
+ }
+ final String subscribedConsumerId =
subscribedConsumerIdSet.iterator().next();
+ return Objects.equals(
+
Objects.requireNonNull(consumerIdToConsumerMeta.get(subscribedConsumerId)).getUsername(),
+
Objects.requireNonNull(consumerIdToConsumerMeta.get(consumerId)).getUsername());
+ }
+
public void addSubscription(final String consumerId, final Set<String>
topics) {
if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
throw new SubscriptionException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
index f1bb9b46085..152f0b111df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java
@@ -54,6 +54,18 @@ public class ConsumerMeta {
return consumerId;
}
+ public String getConsumerGroupId() {
+ return config.getConsumerGroupId();
+ }
+
+ public String getUsername() {
+ return config.getUsername();
+ }
+
+ public String getPassword() {
+ return config.getPassword();
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index 86d9d0e49e7..3a092d9f80f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.commons.subscription.meta.topic;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
@@ -181,14 +180,16 @@ public class TopicMeta {
/////////////////////////////// utilities ///////////////////////////////
- public Map<String, String> generateExtractorAttributes() {
+ public Map<String, String> generateExtractorAttributes(final String
username) {
final Map<String, String> extractorAttributes = new HashMap<>();
// disable meta sync
extractorAttributes.put("source", "iotdb-source");
extractorAttributes.put("inclusion", "data.insert");
extractorAttributes.put("inclusion.exclusion", "data.delete");
- // Currently use root in subscription pipes
- extractorAttributes.put("username",
CommonDescriptor.getInstance().getConfig().getAdminName());
+ // user
+ extractorAttributes.put("username", username);
+ // TODO: currently set skipif to no-privileges
+ extractorAttributes.put("skipif", "no-privileges");
// sql dialect
extractorAttributes.putAll(config.getAttributeWithSqlDialect());
if (config.isTableTopic()) {