This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit b2177f1c1e3242ecb3d53b917769f8aa147ee712
Author: gosonzhang <[email protected]>
AuthorDate: Fri Dec 4 17:36:55 2020 +0800

    [TUBEMQ-441]An error occurred when using the Tubemq class to create a sink 
table (#339)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../org/apache/flink/connectors/tubemq/Tubemq.java | 28 ++++++++++++++++------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
index 69eb279..dc265b1 100644
--- 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++ 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
@@ -41,6 +41,9 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 public class Tubemq extends ConnectorDescriptor {
 
     @Nullable
+    private boolean consumerRole = true;
+
+    @Nullable
     private String topic;
 
     @Nullable
@@ -74,6 +77,16 @@ public class Tubemq extends ConnectorDescriptor {
     }
 
     /**
+     * Sets the client role to be used.
+     *
+     * @param isConsumer The client role if consumer.
+     */
+    public Tubemq asConsumer(boolean isConsumer) {
+        this.consumerRole = isConsumer;
+        return this;
+    }
+
+    /**
      * Sets the address of tubemq master to connect.
      *
      * @param master The address of tubemq master.
@@ -133,13 +146,14 @@ public class Tubemq extends ConnectorDescriptor {
         if (master != null) {
             descriptorProperties.putString(CONNECTOR_MASTER, master);
         }
-
-        if (group != null) {
-            descriptorProperties.putString(CONNECTOR_GROUP, group);
-        }
-
-        if (tids != null) {
-            descriptorProperties.putString(CONNECTOR_TIDS, tids);
+        if (consumerRole) {
+            if (group != null) {
+                descriptorProperties.putString(CONNECTOR_GROUP, group);
+            }
+
+            if (tids != null) {
+                descriptorProperties.putString(CONNECTOR_TIDS, tids);
+            }
         }
 
         descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, 
properties);

Reply via email to