This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 62e1da198be Subscription: [Bug] When using SubscriptionPullConsumer to
consume data, an error is reported when configuring
ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) (#15358)
62e1da198be is described below
commit 62e1da198beb3d97b88f0a4a8e84c098c1994834
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:41:56 2025 +0800
Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an
error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the
cluster address. (#15322) (#15358)
* [Bug] When using SubscriptionPullConsumer to consume data, an error is
reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster
address. (#15322)
### Description
When using SubscriptionPullConsumer to consume data, an error is reported
when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.
### Code:
```
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
consumerConfig.put(ConsumerConstant.NODE_URLS_KEY,
Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
try (SubscriptionPullConsumer pullConsumer = new
SubscriptionPullConsumer(consumerConfig)) {
pullConsumer.open();
pullConsumer.subscribe("my_topic");
while (true) {
List<SubscriptionMessage> messages =
pullConsumer.poll(10000);
for (final SubscriptionMessage message : messages) {
final short messageType = message.getMessageType();
if
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
for (final SubscriptionSessionDataSet dataSet :
message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
final RowRecord record = dataSet.next();
System.out.println(record);
}
}
}
}
}
}
```
### Error:
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException:
Cluster has no available subscription providers to connect with initial
endpoints [TEndPoint(ip:localhost, port:0)]
at
org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
at
org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
at
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
at
org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
### Reason
Since the value property of Properties cannot be set to null, and the
AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder,
final Properties properties) constructor sets default values, the condition if
(Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the
AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder)
constructor will always evaluate to true.
##### Key changed/added classes
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
* fixup
---------
Co-authored-by: lixiaobao <[email protected]>
---
.../session/subscription/consumer/SubscriptionConsumer.java | 10 +++-------
.../subscription/consumer/SubscriptionPullConsumer.java | 2 +-
.../subscription/consumer/SubscriptionPushConsumer.java | 2 +-
3 files changed, 5 insertions(+), 9 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 07dc546dca1..324797967fb 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -188,12 +188,8 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
protected SubscriptionConsumer(final Builder builder, final Properties
properties) {
this(
builder
- .host(
- (String)
- properties.getOrDefault(ConsumerConstant.HOST_KEY,
SessionConfig.DEFAULT_HOST))
- .port(
- (Integer)
- properties.getOrDefault(ConsumerConstant.PORT_KEY,
SessionConfig.DEFAULT_PORT))
+ .host((String) properties.get(ConsumerConstant.HOST_KEY))
+ .port((Integer) properties.get(ConsumerConstant.PORT_KEY))
.nodeUrls((List<String>)
properties.get(ConsumerConstant.NODE_URLS_KEY))
.username(
(String)
@@ -1407,7 +1403,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
- public Builder port(final int port) {
+ public Builder port(final Integer port) {
this.port = port;
return this;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 75d8119ea04..cc56df89782 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -301,7 +301,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
}
@Override
- public Builder port(final int port) {
+ public Builder port(final Integer port) {
super.port(port);
return this;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 327596d474f..d3c34b7ecaf 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -234,7 +234,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
}
@Override
- public Builder port(final int port) {
+ public Builder port(final Integer port) {
super.port(port);
return this;
}