This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 62e1da198be Subscription: [Bug] When using SubscriptionPullConsumer to 
consume data, an error is reported when configuring 
ConsumerConstant.NODE_URLS_KEY as the cluster address. (#15322) (#15358)
62e1da198be is described below

commit 62e1da198beb3d97b88f0a4a8e84c098c1994834
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:41:56 2025 +0800

    Subscription: [Bug] When using SubscriptionPullConsumer to consume data, an 
error is reported when configuring ConsumerConstant.NODE_URLS_KEY as the 
cluster address. (#15322) (#15358)
    
    * [Bug] When using SubscriptionPullConsumer to consume data, an error is 
reported when configuring ConsumerConstant.NODE_URLS_KEY as the cluster 
address. (#15322)
    
    ### Description
    When using SubscriptionPullConsumer to consume data, an error is reported 
when configuring ConsumerConstant.NODE_URLS_KEY as the cluster address.
    
    
    ### Code:
    
    ```
    Properties consumerConfig = new Properties();
            consumerConfig.put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
            consumerConfig.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
            consumerConfig.put(ConsumerConstant.NODE_URLS_KEY, 
Arrays.asList("192.168.1.1:6667","192.168.1.2:6667","192.168.1.3:6667"));
            consumerConfig.put(ConsumerConstant.USERNAME_KEY, "root");
            consumerConfig.put(ConsumerConstant.PASSWORD_KEY, "root");
            try (SubscriptionPullConsumer pullConsumer = new 
SubscriptionPullConsumer(consumerConfig)) {
                pullConsumer.open();
                pullConsumer.subscribe("my_topic");
                while (true) {
                    List<SubscriptionMessage> messages = 
pullConsumer.poll(10000);
                    for (final SubscriptionMessage message : messages) {
                        final short messageType = message.getMessageType();
                        if 
(SubscriptionMessageType.isValidatedMessageType(messageType)) {
                            for (final SubscriptionSessionDataSet dataSet : 
message.getSessionDataSetsHandler()) {
                                while (dataSet.hasNext()) {
                                    final RowRecord record = dataSet.next();
                                    System.out.println(record);
                                }
                            }
                        }
                    }
                }
            }
    
    ```
    
    ### Error:
    
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException: 
Cluster has no available subscription providers to connect with initial 
endpoints [TEndPoint(ip:localhost, port:0)]
    
            at 
org.apache.iotdb.session.subscription.consumer.SubscriptionProviders.openProviders(SubscriptionProviders.java:123)
            at 
org.apache.iotdb.session.subscription.consumer.SubscriptionConsumer.open(SubscriptionConsumer.java:260)
            at 
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer.open(SubscriptionPullConsumer.java:112)
            at 
org.apache.iotdb.session.SessionTest.testNullCluster(SessionTest.java:1222)
            at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
            at java.base/java.lang.reflect.Method.invoke(Method.java:580)
            at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
            at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
            at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
            at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
            at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
            at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
            at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
            at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
            at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
            at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
            at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
            at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
            at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
            at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
            at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
            at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
            at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
            at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
            at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
            at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
            at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
            at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
            at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
            at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
            at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
    
    
    ### Reason
    Since the value property of Properties cannot be set to null, and the 
AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder, 
final Properties properties) constructor sets default values, the condition if 
(Objects.nonNull(builder.host) || Objects.nonNull(builder.port)) in the 
AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) 
constructor will always evaluate to true.
    
    ##### Key changed/added classes
    
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
    
    * fixup
    
    ---------
    
    Co-authored-by: lixiaobao <[email protected]>
---
 .../session/subscription/consumer/SubscriptionConsumer.java    | 10 +++-------
 .../subscription/consumer/SubscriptionPullConsumer.java        |  2 +-
 .../subscription/consumer/SubscriptionPushConsumer.java        |  2 +-
 3 files changed, 5 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 07dc546dca1..324797967fb 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -188,12 +188,8 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   protected SubscriptionConsumer(final Builder builder, final Properties 
properties) {
     this(
         builder
-            .host(
-                (String)
-                    properties.getOrDefault(ConsumerConstant.HOST_KEY, 
SessionConfig.DEFAULT_HOST))
-            .port(
-                (Integer)
-                    properties.getOrDefault(ConsumerConstant.PORT_KEY, 
SessionConfig.DEFAULT_PORT))
+            .host((String) properties.get(ConsumerConstant.HOST_KEY))
+            .port((Integer) properties.get(ConsumerConstant.PORT_KEY))
             .nodeUrls((List<String>) 
properties.get(ConsumerConstant.NODE_URLS_KEY))
             .username(
                 (String)
@@ -1407,7 +1403,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
       return this;
     }
 
-    public Builder port(final int port) {
+    public Builder port(final Integer port) {
       this.port = port;
       return this;
     }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 75d8119ea04..cc56df89782 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -301,7 +301,7 @@ public class SubscriptionPullConsumer extends 
SubscriptionConsumer {
     }
 
     @Override
-    public Builder port(final int port) {
+    public Builder port(final Integer port) {
       super.port(port);
       return this;
     }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 327596d474f..d3c34b7ecaf 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -234,7 +234,7 @@ public class SubscriptionPushConsumer extends 
SubscriptionConsumer {
     }
 
     @Override
-    public Builder port(final int port) {
+    public Builder port(final Integer port) {
       super.port(port);
       return this;
     }

Reply via email to