This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2db087dc0fd Subscription: randomly generate consumer group id and
consumer id if not exist & validate topic config ahead on DN (#12467)
2db087dc0fd is described below
commit 2db087dc0fd6f585fe088cf3df41363fb5b12ebf
Author: V_Galaxy <[email protected]>
AuthorDate: Mon May 6 20:24:53 2024 +0800
Subscription: randomly generate consumer group id and consumer id if not
exist & validate topic config ahead on DN (#12467)
---
.../it/dual/IoTDBSubscriptionTopicIT.java | 47 ++++++++++++++++++++++
.../rpc/subscription/config/ConsumerConfig.java | 8 ++++
.../response/PipeSubscribeHandshakeResp.java | 26 +++++++++---
.../session/subscription/SubscriptionConsumer.java | 30 +++++++++-----
.../subscription/SubscriptionEndpointsSyncer.java | 9 +++--
.../session/subscription/SubscriptionProvider.java | 25 +++++++++---
.../SubscriptionSessionConnection.java | 6 +--
.../config/executor/ClusterConfigTaskExecutor.java | 31 +++++++++-----
.../receiver/SubscriptionReceiverV1.java | 39 +++++++++++++-----
9 files changed, 172 insertions(+), 49 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 929c6576e2e..ea48ab9950b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.subscription.it.dual;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -31,6 +34,7 @@ import
org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
import org.apache.tsfile.write.record.Tablet;
import org.awaitility.Awaitility;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -90,6 +94,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
e.printStackTrace();
fail(e.getMessage());
}
+ assertTopicCount(1);
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -197,6 +202,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
e.printStackTrace();
fail(e.getMessage());
}
+ assertTopicCount(1);
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -300,6 +306,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
e.printStackTrace();
fail(e.getMessage());
}
+ assertTopicCount(1);
// Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -429,6 +436,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
e.printStackTrace();
fail(e.getMessage());
}
+ assertTopicCount(3);
// Subscribe on sender and insert on receiver
final Set<String> topics = new HashSet<>();
@@ -508,4 +516,43 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
thread.join();
}
}
+
+ @Test
+ public void testTopicInvalidConfig() throws Exception {
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+
+ // Scenario 1: invalid time
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties properties = new Properties();
+ properties.put(TopicConstant.START_TIME_KEY, "2024-01-32");
+ properties.put(TopicConstant.END_TIME_KEY, "now");
+ session.createTopic("topic1", properties);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ assertTopicCount(0);
+
+ // Scenario 2: test when 'start-time' is greater than 'end-time'
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties properties = new Properties();
+ properties.put(TopicConstant.START_TIME_KEY, "2001.01.01T08:00:00");
+ properties.put(TopicConstant.END_TIME_KEY, "2000.01.01T08:00:00");
+ session.createTopic("topic1", properties);
+ fail();
+ } catch (final Exception ignored) {
+ }
+ assertTopicCount(0);
+ }
+
+ private void assertTopicCount(final int count) throws Exception {
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowTopicInfo> showTopicResult =
+ client.showTopic(new TShowTopicReq()).topicInfoList;
+ Assert.assertEquals(count, showTopicResult.size());
+ }
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 976ad9c4952..9b904f07caf 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -58,4 +58,12 @@ public class ConsumerConfig extends PipeParameters {
public String getConsumerGroupId() {
return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
}
+
+ public void setConsumerId(final String consumerId) {
+ attributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+ }
+
+ public void setConsumerGroupId(final String consumerGroupId) {
+ attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId);
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
index 58d7baee483..91de9db6e79 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHandshakeResp.java
@@ -35,23 +35,37 @@ import java.util.Objects;
public class PipeSubscribeHandshakeResp extends TPipeSubscribeResp {
- // dataNodeId -> clientRpcEndPoint
private transient int dataNodeId;
+ private transient String consumerId;
+
+ private transient String consumerGroupId;
+
public int getDataNodeId() {
return dataNodeId;
}
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public String getConsumerGroupId() {
+ return consumerGroupId;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
/**
* Serialize the incoming parameters into `PipeSubscribeHandshakeResp`,
called by the subscription
* server.
*/
- public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus
status, int dataNodeId) {
+ public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(
+ TSStatus status, int dataNodeId, String consumerId, String
consumerGroupId) {
final PipeSubscribeHandshakeResp resp = new PipeSubscribeHandshakeResp();
resp.dataNodeId = dataNodeId;
+ resp.consumerId = consumerId;
+ resp.consumerGroupId = consumerGroupId;
resp.status = status;
resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
@@ -60,6 +74,8 @@ public class PipeSubscribeHandshakeResp extends
TPipeSubscribeResp {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(dataNodeId, outputStream);
+ ReadWriteIOUtils.write(consumerId, outputStream);
+ ReadWriteIOUtils.write(consumerGroupId, outputStream);
resp.body =
Collections.singletonList(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
@@ -71,10 +87,6 @@ public class PipeSubscribeHandshakeResp extends
TPipeSubscribeResp {
return resp;
}
- public static PipeSubscribeHandshakeResp toTPipeSubscribeResp(TSStatus
status) {
- return toTPipeSubscribeResp(status, -1);
- }
-
/** Deserialize `TPipeSubscribeResp` to obtain parameters, called by the
subscription client. */
public static PipeSubscribeHandshakeResp fromTPipeSubscribeResp(
TPipeSubscribeResp handshakeResp) {
@@ -84,6 +96,8 @@ public class PipeSubscribeHandshakeResp extends
TPipeSubscribeResp {
ByteBuffer byteBuffer = handshakeResp.body.get(0);
if (byteBuffer.hasRemaining()) {
resp.dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
+ resp.consumerId = ReadWriteIOUtils.readString(byteBuffer);
+ resp.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
index ccf73c35983..c7ee70dfcb7 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
@@ -64,8 +64,8 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
private final String username;
private final String password;
- private final String consumerId;
- private final String consumerGroupId;
+ private String consumerId;
+ private String consumerGroupId;
private final long heartbeatIntervalMs;
private final long endpointsSyncIntervalMs;
@@ -322,9 +322,22 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
/////////////////////////////// subscription provider
///////////////////////////////
- SubscriptionProvider constructProvider(final TEndPoint endPoint) {
- return new SubscriptionProvider(
- endPoint, this.username, this.password, this.consumerId,
this.consumerGroupId);
+ SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint)
+ throws TException, IoTDBConnectionException, IOException,
StatementExecutionException {
+ final SubscriptionProvider provider =
+ new SubscriptionProvider(
+ endPoint, this.username, this.password, this.consumerId,
this.consumerGroupId);
+ provider.handshake();
+
+ // update consumer id and consumer group id if not exist
+ if (Objects.isNull(this.consumerId)) {
+ this.consumerId = provider.getConsumerId();
+ }
+ if (Objects.isNull(this.consumerGroupId)) {
+ this.consumerGroupId = provider.getConsumerGroupId();
+ }
+
+ return provider;
}
/** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock()}. */
@@ -337,12 +350,12 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
final int defaultDataNodeId;
try {
- defaultProvider = constructProvider(endPoint);
- defaultDataNodeId = defaultProvider.handshake();
+ defaultProvider = constructProviderAndHandshake(endPoint);
} catch (final Exception e) {
LOGGER.warn("Failed to create connection with {}, exception: {}",
endPoint, e.getMessage());
continue; // try next endpoint
}
+ defaultDataNodeId = defaultProvider.getDataNodeId();
addProvider(defaultDataNodeId, defaultProvider);
final Map<Integer, TEndPoint> allEndPoints;
@@ -363,8 +376,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
final SubscriptionProvider provider;
try {
- provider = constructProvider(entry.getValue());
- provider.handshake();
+ provider = constructProviderAndHandshake(entry.getValue());
} catch (final Exception e) {
LOGGER.warn(
"Failed to create connection with {}, exception: {}, will retry
later...",
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
index d7aea1981d5..c5550a6ba51 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
@@ -76,13 +76,14 @@ public class SubscriptionEndpointsSyncer implements
Runnable {
final SubscriptionProvider provider =
consumer.getProvider(entry.getKey());
if (Objects.isNull(provider)) {
// new provider
- final SubscriptionProvider newProvider =
consumer.constructProvider(entry.getValue());
+ final TEndPoint endPoint = entry.getValue();
+ final SubscriptionProvider newProvider;
try {
- newProvider.handshake();
+ newProvider = consumer.constructProviderAndHandshake(endPoint);
} catch (final Exception e) {
LOGGER.warn(
- "Failed to create connection with subscription provider {},
exception: {}, will retry later...",
- newProvider,
+ "Failed to create connection with endpoint {}, exception: {},
will retry later...",
+ endPoint,
e.getMessage());
continue; // retry later
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
index 45006f43f64..ca822e8fa8e 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import org.apache.thrift.TException;
@@ -34,8 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
final class SubscriptionProvider extends SubscriptionSession {
- private final String consumerId;
- private final String consumerGroupId;
+ private String consumerId;
+ private String consumerGroupId;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
private final AtomicBoolean isAvailable = new AtomicBoolean(false);
@@ -56,10 +57,10 @@ final class SubscriptionProvider extends
SubscriptionSession {
this.consumerGroupId = consumerGroupId;
}
- synchronized int handshake()
+ synchronized void handshake()
throws IoTDBConnectionException, TException, IOException,
StatementExecutionException {
if (!isClosed.get()) {
- return -1;
+ return;
}
super.open();
@@ -67,11 +68,15 @@ final class SubscriptionProvider extends
SubscriptionSession {
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY,
consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
- dataNodeId = getSessionConnection().handshake(new
ConsumerConfig(consumerAttributes));
+
+ final PipeSubscribeHandshakeResp resp =
+ getSessionConnection().handshake(new
ConsumerConfig(consumerAttributes));
+ dataNodeId = resp.getDataNodeId();
+ consumerId = resp.getConsumerId();
+ consumerGroupId = resp.getConsumerGroupId();
isClosed.set(false);
setAvailable();
- return dataNodeId;
}
@Override
@@ -112,6 +117,14 @@ final class SubscriptionProvider extends
SubscriptionSession {
return dataNodeId;
}
+ String getConsumerId() {
+ return consumerId;
+ }
+
+ String getConsumerGroupId() {
+ return consumerGroupId;
+ }
+
TEndPoint getEndPoint() {
return endPoint;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 891cd0fc4ff..204f680c1fb 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -90,14 +90,12 @@ public class SubscriptionSessionConnection extends
SessionConnection {
return endPoints;
}
- public int handshake(ConsumerConfig consumerConfig)
+ public PipeSubscribeHandshakeResp handshake(ConsumerConfig consumerConfig)
throws TException, IOException, StatementExecutionException {
TPipeSubscribeResp resp =
client.pipeSubscribe(PipeSubscribeHandshakeReq.toTPipeSubscribeReq(consumerConfig));
RpcUtils.verifySuccess(resp.status);
- PipeSubscribeHandshakeResp handshakeResp =
- PipeSubscribeHandshakeResp.fromTPipeSubscribeResp(resp);
- return handshakeResp.getDataNodeId();
+ return PipeSubscribeHandshakeResp.fromTPipeSubscribeResp(resp);
}
public void heartbeat() throws TException, StatementExecutionException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a470b02b9da..26f35e0cd63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
@@ -1688,7 +1689,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
createPipeStatement.getProcessorAttributes(),
createPipeStatement.getConnectorAttributes());
} catch (Exception e) {
- LOGGER.info("Failed to validate pipe statement, because {}",
e.getMessage(), e);
+ LOGGER.info("Failed to validate create pipe statement, because {}",
e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
@@ -1746,7 +1747,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
PipeAgent.plugin().validateConnector(pipeName,
alterPipeStatement.getConnectorAttributes());
}
} catch (Exception e) {
- LOGGER.info("Failed to validate pipe statement, because {}",
e.getMessage(), e);
+ LOGGER.info("Failed to validate alter pipe statement, because {}",
e.getMessage(), e);
future.setException(
new IoTDBException(e.getMessage(),
TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
@@ -1928,18 +1929,30 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement
createTopicStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+
+ final String topicName = createTopicStatement.getTopicName();
+ final Map<String, String> topicAttributes =
createTopicStatement.getTopicAttributes();
+
+ // Validate topic config
+ final TopicMeta temporaryTopicMeta =
+ new TopicMeta(topicName, System.currentTimeMillis(), topicAttributes);
+ try {
+
PipeAgent.plugin().validateExtractor(temporaryTopicMeta.generateExtractorAttributes());
+
PipeAgent.plugin().validateProcessor(temporaryTopicMeta.generateProcessorAttributes());
+ } catch (Exception e) {
+ LOGGER.info("Failed to validate create topic statement, because {}",
e.getMessage(), e);
+ future.setException(
+ new IoTDBException(e.getMessage(),
TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode()));
+ return future;
+ }
+
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreateTopicReq req =
- new TCreateTopicReq()
- .setTopicName(createTopicStatement.getTopicName())
- .setTopicAttributes(createTopicStatement.getTopicAttributes());
+ new
TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes);
final TSStatus tsStatus = configNodeClient.createTopic(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.warn(
- "Failed to create topic {} in config node, status is {}.",
- createTopicStatement.getTopicName(),
- tsStatus);
+ LOGGER.warn("Failed to create topic {} in config node, status is {}.",
topicName, tsStatus);
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 5d9a473c059..cfcfbbec8dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -68,10 +68,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
public class SubscriptionReceiverV1 implements SubscriptionReceiver {
@@ -150,7 +152,10 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
- RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR,
exceptionMessage));
+ RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR,
exceptionMessage),
+ -1,
+ "",
+ "");
}
}
@@ -160,6 +165,17 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
ConsumerConfig existedConsumerConfig = consumerConfigThreadLocal.get();
ConsumerConfig consumerConfig = req.getConsumerConfig();
+ String consumerId = consumerConfig.getConsumerId();
+ if (Objects.isNull(consumerId)) {
+ consumerId = UUID.randomUUID().toString();
+ consumerConfig.setConsumerId(consumerId);
+ }
+ String consumerGroupId = consumerConfig.getConsumerGroupId();
+ if (Objects.isNull(consumerGroupId)) {
+ consumerGroupId = UUID.randomUUID().toString();
+ consumerConfig.setConsumerGroupId(consumerGroupId);
+ }
+
if (Objects.isNull(existedConsumerConfig)) {
consumerConfigThreadLocal.set(consumerConfig);
} else {
@@ -175,8 +191,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
}
// create consumer if not existed
- if (!SubscriptionAgent.consumer()
- .isConsumerExisted(consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId())) {
+ if (!SubscriptionAgent.consumer().isConsumerExisted(consumerGroupId,
consumerId)) {
createConsumer(consumerConfig);
} else {
LOGGER.info(
@@ -189,7 +204,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: consumer {} handshake successfully, data node id: {}",
req.getConsumerConfig(),
dataNodeId);
- return
PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS,
dataNodeId);
+ return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS, dataNodeId, consumerId, consumerGroupId);
}
private TPipeSubscribeResp
handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq req) {
@@ -201,7 +217,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when heartbeat: %s,
req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR,
exceptionMessage));
}
}
@@ -230,7 +246,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when subscribing:
%s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR,
exceptionMessage));
}
}
@@ -262,7 +278,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when unsubscribing:
%s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR,
exceptionMessage));
}
}
@@ -297,8 +313,9 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when polling: %s,
req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
- RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR,
exceptionMessage));
+ return PipeSubscribePollResp.toTPipeSubscribeResp(
+ RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR,
exceptionMessage),
+ Collections.emptyList());
}
}
@@ -369,7 +386,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when committing:
%s, req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ return PipeSubscribeCommitResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR,
exceptionMessage));
}
}
@@ -404,7 +421,7 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
"Subscription: something unexpected happened when closing: %s,
req: %s",
e.getMessage(), req);
LOGGER.warn(exceptionMessage);
- return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(
+ return PipeSubscribeCloseResp.toTPipeSubscribeResp(
RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR,
exceptionMessage));
}
}