This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db087dc0fd Subscription: randomly generate consumer group id and 
consumer id if not exist & validate topic config ahead on DN (#12467)
2db087dc0fd is described below

commit 2db087dc0fd6f585fe088cf3df41363fb5b12ebf
Author: V_Galaxy <[email protected]>
AuthorDate: Mon May 6 20:24:53 2024 +0800

    Subscription: randomly generate consumer group id and consumer id if not 
exist & validate topic config ahead on DN (#12467)
---
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 47 ++++++++++++++++++++++
 .../rpc/subscription/config/ConsumerConfig.java    |  8 ++++
 .../response/PipeSubscribeHandshakeResp.java       | 26 +++++++++---
 .../session/subscription/SubscriptionConsumer.java | 30 +++++++++-----
 .../subscription/SubscriptionEndpointsSyncer.java  |  9 +++--
 .../session/subscription/SubscriptionProvider.java | 25 +++++++++---
 .../SubscriptionSessionConnection.java             |  6 +--
 .../config/executor/ClusterConfigTaskExecutor.java | 31 +++++++++-----
 .../receiver/SubscriptionReceiverV1.java           | 39 +++++++++++++-----
 9 files changed, 172 insertions(+), 49 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 929c6576e2e..ea48ab9950b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.subscription.it.dual;
 
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -31,6 +34,7 @@ import 
org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
 
 import org.apache.tsfile.write.record.Tablet;
 import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -90,6 +94,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       e.printStackTrace();
       fail(e.getMessage());
     }
+    assertTopicCount(1);
 
     // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -197,6 +202,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       e.printStackTrace();
       fail(e.getMessage());
     }
+    assertTopicCount(1);
 
     // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -300,6 +306,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       e.printStackTrace();
       fail(e.getMessage());
     }
+    assertTopicCount(1);
 
     // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -429,6 +436,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       e.printStackTrace();
       fail(e.getMessage());
     }
+    assertTopicCount(3);
 
     // Subscribe on sender and insert on receiver
     final Set<String> topics = new HashSet<>();
@@ -508,4 +516,43 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       thread.join();
     }
   }
+
+  @Test
+  public void testTopicInvalidConfig() throws Exception {
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+
+    // Scenario 1: invalid time
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties properties = new Properties();
+      properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
+      properties.put(TopicConstant.END_TIME_KEY, "now");
+      session.createTopic("topic1", properties);
+      fail();
+    } catch (final Exception ignored) {
+    }
+    assertTopicCount(0);
+
+    // Scenario 2: test when 'start-time' is greater than 'end-time'
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties properties = new Properties();
+      properties.put(TopicConstant.START_TIME_KEY, "2001.01.01T08:00:00");
+      properties.put(TopicConstant.END_TIME_KEY, "2000.01.01T08:00:00");
+      session.createTopic("topic1", properties);
+      fail();
+    } catch (final Exception ignored) {
+    }
+    assertTopicCount(0);
+  }
+
+  private void assertTopicCount(final int count) throws Exception {
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowTopicInfo> showTopicResult =
+          client.showTopic(new TShowTopicReq()).topicInfoList;
+      Assert.assertEquals(count, showTopicResult.size());
+    }
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 976ad9c4952..9b904f07caf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -58,4 +58,12 @@ public class ConsumerConfig extends PipeParameters {
   public String getConsumerGroupId() {
     return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
   }
+
+  public void setConsumerId(final String consumerId) {
+    attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+  }
+
+  public void setConsumerGroupId(final String consumerGroupId) {
+    attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId);
+  }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
index 58d7baee483..91de9db6e79 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
@@ -35,23 +35,37 @@ import java.util.Objects;
 
 public class PipeSubscribeHandshakeResp extends TPipeSubscribeResp {
 
-  // dataNodeId -> clientRpcEndPoint
   private transient int dataNodeId;
 
+  private transient String consumerId;
+
+  private transient String consumerGroupId;
+
   public int getDataNodeId() {
     return dataNodeId;
   }
 
+  public String getConsumerId() {
+    return consumerId;
+  }
+
+  public String getConsumerGroupId() {
+    return consumerGroupId;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   /**
    * Serialize the incoming parameters into `PipeSubscribeHandshakeResp`, 
called by the subscription
    * server.
    */
-  public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus 
status, int dataNodeId) {
+  public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(
+      TSStatus status, int dataNodeId, String consumerId, String 
consumerGroupId) {
     final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
 
     resp.dataNodeId = dataNodeId;
+    resp.consumerId = consumerId;
+    resp.consumerGroupId = consumerGroupId;
 
     resp.status = status;
     resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
@@ -60,6 +74,8 @@ public class PipeSubscribeHandshakeResp extends 
TPipeSubscribeResp {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(dataNodeId, outputStream);
+      ReadWriteIOUtils.write(consumerId, outputStream);
+      ReadWriteIOUtils.write(consumerGroupId, outputStream);
       resp.body =
           Collections.singletonList(
               ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
@@ -71,10 +87,6 @@ public class PipeSubscribeHandshakeResp extends 
TPipeSubscribeResp {
     return resp;
   }
 
-  public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus 
status) {
-    return toTPipeSubscribeResp(status, -1);
-  }
-
   /** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the 
subscription client. */
   public static PipeSubscribeHandshakeResp fromTPipeSubscribeResp(
       TPipeSubscribeResp handshakeResp) {
@@ -84,6 +96,8 @@ public class PipeSubscribeHandshakeResp extends 
TPipeSubscribeResp {
       ByteBuffer byteBuffer = handshakeResp.body.get(0);
       if (byteBuffer.hasRemaining()) {
         resp.dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+        resp.consumerId = ReadWriteIOUtils.readString(byteBuffer);
+        resp.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
       }
     }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
index ccf73c35983..c7ee70dfcb7 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
@@ -64,8 +64,8 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
   private final String username;
   private final String password;
 
-  private final String consumerId;
-  private final String consumerGroupId;
+  private String consumerId;
+  private String consumerGroupId;
 
   private final long heartbeatIntervalMs;
   private final long endpointsSyncIntervalMs;
@@ -322,9 +322,22 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
 
   /////////////////////////////// subscription provider 
///////////////////////////////
 
-  SubscriptionProvider constructProvider(final TEndPoint endPoint) {
-    return new SubscriptionProvider(
-        endPoint, this.username, this.password, this.consumerId, 
this.consumerGroupId);
+  SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
+      throws TException, IoTDBConnectionException, IOException, 
StatementExecutionException {
+    final SubscriptionProvider provider =
+        new SubscriptionProvider(
+            endPoint, this.username, this.password, this.consumerId, 
this.consumerGroupId);
+    provider.handshake();
+
+    // update consumer id and consumer group id if not exist
+    if (Objects.isNull(this.consumerId)) {
+      this.consumerId = provider.getConsumerId();
+    }
+    if (Objects.isNull(this.consumerGroupId)) {
+      this.consumerGroupId = provider.getConsumerGroupId();
+    }
+
+    return provider;
   }
 
   /** Caller should ensure that the method is called in the lock {@link 
#acquireWriteLock()}. */
@@ -337,12 +350,12 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
       final int defaultDataNodeId;
 
       try {
-        defaultProvider = constructProvider(endPoint);
-        defaultDataNodeId = defaultProvider.handshake();
+        defaultProvider = constructProviderAndHandshake(endPoint);
       } catch (final Exception e) {
         LOGGER.warn("Failed to create connection with {}, exception: {}", 
endPoint, e.getMessage());
         continue; // try next endpoint
       }
+      defaultDataNodeId = defaultProvider.getDataNodeId();
       addProvider(defaultDataNodeId, defaultProvider);
 
       final Map<Integer, TEndPoint> allEndPoints;
@@ -363,8 +376,7 @@ public abstract class SubscriptionConsumer implements 
AutoCloseable {
 
         final SubscriptionProvider provider;
         try {
-          provider = constructProvider(entry.getValue());
-          provider.handshake();
+          provider = constructProviderAndHandshake(entry.getValue());
         } catch (final Exception e) {
           LOGGER.warn(
               "Failed to create connection with {}, exception: {}, will retry 
later...",
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
index d7aea1981d5..c5550a6ba51 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
@@ -76,13 +76,14 @@ public class SubscriptionEndpointsSyncer implements 
Runnable {
       final SubscriptionProvider provider = 
consumer.getProvider(entry.getKey());
       if (Objects.isNull(provider)) {
         // new provider
-        final SubscriptionProvider newProvider = 
consumer.constructProvider(entry.getValue());
+        final TEndPoint endPoint = entry.getValue();
+        final SubscriptionProvider newProvider;
         try {
-          newProvider.handshake();
+          newProvider = consumer.constructProviderAndHandshake(endPoint);
         } catch (final Exception e) {
           LOGGER.warn(
-              "Failed to create connection with subscription provider {}, 
exception: {}, will retry later...",
-              newProvider,
+              "Failed to create connection with endpoint {}, exception: {}, 
will retry later...",
+              endPoint,
               e.getMessage());
           continue; // retry later
         }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
index 45006f43f64..ca822e8fa8e 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
 
 import org.apache.thrift.TException;
 
@@ -34,8 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 final class SubscriptionProvider extends SubscriptionSession {
 
-  private final String consumerId;
-  private final String consumerGroupId;
+  private String consumerId;
+  private String consumerGroupId;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(true);
   private final AtomicBoolean isAvailable = new AtomicBoolean(false);
@@ -56,10 +57,10 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     this.consumerGroupId = consumerGroupId;
   }
 
-  synchronized int handshake()
+  synchronized void handshake()
       throws IoTDBConnectionException, TException, IOException, 
StatementExecutionException {
     if (!isClosed.get()) {
-      return -1;
+      return;
     }
 
     super.open();
@@ -67,11 +68,15 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     final Map<String, String> consumerAttributes = new HashMap<>();
     consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, 
consumerGroupId);
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
-    dataNodeId = getSessionConnection().handshake(new 
ConsumerConfig(consumerAttributes));
+
+    final PipeSubscribeHandshakeResp resp =
+        getSessionConnection().handshake(new 
ConsumerConfig(consumerAttributes));
+    dataNodeId = resp.getDataNodeId();
+    consumerId = resp.getConsumerId();
+    consumerGroupId = resp.getConsumerGroupId();
 
     isClosed.set(false);
     setAvailable();
-    return dataNodeId;
   }
 
   @Override
@@ -112,6 +117,14 @@ final class SubscriptionProvider extends 
SubscriptionSession {
     return dataNodeId;
   }
 
+  String getConsumerId() {
+    return consumerId;
+  }
+
+  String getConsumerGroupId() {
+    return consumerGroupId;
+  }
+
   TEndPoint getEndPoint() {
     return endPoint;
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 891cd0fc4ff..204f680c1fb 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -90,14 +90,12 @@ public class SubscriptionSessionConnection extends 
SessionConnection {
     return endPoints;
   }
 
-  public int handshake(ConsumerConfig consumerConfig)
+  public PipeSubscribeHandshakeResp handshake(ConsumerConfig consumerConfig)
       throws TException, IOException, StatementExecutionException {
     TPipeSubscribeResp resp =
         
client.pipeSubscribe(PipeSubscribeHandshakeReq.toTPipeSubscribeReq(consumerConfig));
     RpcUtils.verifySuccess(resp.status);
-    PipeSubscribeHandshakeResp handshakeResp =
-        PipeSubscribeHandshakeResp.fromTPipeSubscribeResp(resp);
-    return handshakeResp.getDataNodeId();
+    return PipeSubscribeHandshakeResp.fromTPipeSubscribeResp(resp);
   }
 
   public void heartbeat() throws TException, StatementExecutionException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a470b02b9da..26f35e0cd63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -1688,7 +1689,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               createPipeStatement.getProcessorAttributes(),
               createPipeStatement.getConnectorAttributes());
     } catch (Exception e) {
-      LOGGER.info("Failed to validate pipe statement, because {}", 
e.getMessage(), e);
+      LOGGER.info("Failed to validate create pipe statement, because {}", 
e.getMessage(), e);
       future.setException(
           new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
@@ -1746,7 +1747,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         PipeAgent.plugin().validateConnector(pipeName, 
alterPipeStatement.getConnectorAttributes());
       }
     } catch (Exception e) {
-      LOGGER.info("Failed to validate pipe statement, because {}", 
e.getMessage(), e);
+      LOGGER.info("Failed to validate alter pipe statement, because {}", 
e.getMessage(), e);
       future.setException(
           new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
       return future;
@@ -1928,18 +1929,30 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   @Override
   public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement 
createTopicStatement) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+    final String topicName = createTopicStatement.getTopicName();
+    final Map<String, String> topicAttributes = 
createTopicStatement.getTopicAttributes();
+
+    // Validate topic config
+    final TopicMeta temporaryTopicMeta =
+        new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
+    try {
+      
PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
+      
PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
+    } catch (Exception e) {
+      LOGGER.info("Failed to validate create topic statement, because {}", 
e.getMessage(), e);
+      future.setException(
+          new IoTDBException(e.getMessage(), 
TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode()));
+      return future;
+    }
+
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       final TCreateTopicReq req =
-          new TCreateTopicReq()
-              .setTopicName(createTopicStatement.getTopicName())
-              .setTopicAttributes(createTopicStatement.getTopicAttributes());
+          new 
TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes);
       final TSStatus tsStatus = configNodeClient.createTopic(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.warn(
-            "Failed to create topic {} in config node, status is {}.",
-            createTopicStatement.getTopicName(),
-            tsStatus);
+        LOGGER.warn("Failed to create topic {} in config node, status is {}.", 
topicName, tsStatus);
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 5d9a473c059..cfcfbbec8dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -68,10 +68,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 public class SubscriptionReceiverV1 implements SubscriptionReceiver {
@@ -150,7 +152,10 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
       return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
-          RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, 
exceptionMessage));
+          RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, 
exceptionMessage),
+          -1,
+          "",
+          "");
     }
   }
 
@@ -160,6 +165,17 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     ConsumerConfig existedConsumerConfig = consumerConfigThreadLocal.get();
     ConsumerConfig consumerConfig = req.getConsumerConfig();
 
+    String consumerId = consumerConfig.getConsumerId();
+    if (Objects.isNull(consumerId)) {
+      consumerId = UUID.randomUUID().toString();
+      consumerConfig.setConsumerId(consumerId);
+    }
+    String consumerGroupId = consumerConfig.getConsumerGroupId();
+    if (Objects.isNull(consumerGroupId)) {
+      consumerGroupId = UUID.randomUUID().toString();
+      consumerConfig.setConsumerGroupId(consumerGroupId);
+    }
+
     if (Objects.isNull(existedConsumerConfig)) {
       consumerConfigThreadLocal.set(consumerConfig);
     } else {
@@ -175,8 +191,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     // create consumer if not existed
-    if (!SubscriptionAgent.consumer()
-        .isConsumerExisted(consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId())) {
+    if (!SubscriptionAgent.consumer().isConsumerExisted(consumerGroupId, 
consumerId)) {
       createConsumer(consumerConfig);
     } else {
       LOGGER.info(
@@ -189,7 +204,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
         "Subscription: consumer {} handshake successfully, data node id: {}",
         req.getConsumerConfig(),
         dataNodeId);
-    return 
PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, 
dataNodeId);
+    return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+        RpcUtils.SUCCESS_STATUS, dataNodeId, consumerId, consumerGroupId);
   }
 
   private TPipeSubscribeResp 
handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq req) {
@@ -201,7 +217,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when heartbeat: %s, 
req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+      return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
           RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR, 
exceptionMessage));
     }
   }
@@ -230,7 +246,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when subscribing: 
%s, req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+      return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(
           RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR, 
exceptionMessage));
     }
   }
@@ -262,7 +278,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when unsubscribing: 
%s, req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+      return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(
           RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR, 
exceptionMessage));
     }
   }
@@ -297,8 +313,9 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when polling: %s, 
req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
-          RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, 
exceptionMessage));
+      return PipeSubscribePollResp.toTPipeSubscribeResp(
+          RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, 
exceptionMessage),
+          Collections.emptyList());
     }
   }
 
@@ -369,7 +386,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when committing: 
%s, req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+      return PipeSubscribeCommitResp.toTPipeSubscribeResp(
           RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, 
exceptionMessage));
     }
   }
@@ -404,7 +421,7 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
               "Subscription: something unexpected happened when closing: %s, 
req: %s",
               e.getMessage(), req);
       LOGGER.warn(exceptionMessage);
-      return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+      return PipeSubscribeCloseResp.toTPipeSubscribeResp(
           RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, 
exceptionMessage));
     }
   }

Reply via email to