This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit b2177f1c1e3242ecb3d53b917769f8aa147ee712 Author: gosonzhang <[email protected]> AuthorDate: Fri Dec 4 17:36:55 2020 +0800 [TUBEMQ-441]An error occurred when using the Tubemq class to create a sink table (#339) Co-authored-by: gosonzhang <[email protected]> --- .../org/apache/flink/connectors/tubemq/Tubemq.java | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java index 69eb279..dc265b1 100644 --- a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java +++ b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java @@ -41,6 +41,9 @@ import org.apache.flink.table.descriptors.DescriptorProperties; public class Tubemq extends ConnectorDescriptor { @Nullable + private boolean consumerRole = true; + + @Nullable private String topic; @Nullable @@ -74,6 +77,16 @@ public class Tubemq extends ConnectorDescriptor { } /** + * Sets the client role to be used. + * + * @param isConsumer The client role if consumer. + */ + public Tubemq asConsumer(boolean isConsumer) { + this.consumerRole = isConsumer; + return this; + } + + /** * Sets the address of tubemq master to connect. * * @param master The address of tubemq master. @@ -133,13 +146,14 @@ public class Tubemq extends ConnectorDescriptor { if (master != null) { descriptorProperties.putString(CONNECTOR_MASTER, master); } - - if (group != null) { - descriptorProperties.putString(CONNECTOR_GROUP, group); - } - - if (tids != null) { - descriptorProperties.putString(CONNECTOR_TIDS, tids); + if (consumerRole) { + if (group != null) { + descriptorProperties.putString(CONNECTOR_GROUP, group); + } + + if (tids != null) { + descriptorProperties.putString(CONNECTOR_TIDS, tids); + } } descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, properties);
