This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 288a2e3 [TUBEMQ-441]An error occurred when using the Tubemq class to
create a sink table (#339)
288a2e3 is described below
commit 288a2e3540a0a2fa3eba4f0f4070b92aa077836b
Author: gosonzhang <[email protected]>
AuthorDate: Fri Dec 4 17:36:55 2020 +0800
[TUBEMQ-441]An error occurred when using the Tubemq class to create a sink
table (#339)
Co-authored-by: gosonzhang <[email protected]>
---
.../org/apache/flink/connectors/tubemq/Tubemq.java | 28 ++++++++++++++++------
1 file changed, 21 insertions(+), 7 deletions(-)
diff --git
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
index 69eb279..dc265b1 100644
---
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
@@ -41,6 +41,9 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
public class Tubemq extends ConnectorDescriptor {
@Nullable
+ private boolean consumerRole = true;
+
+ @Nullable
private String topic;
@Nullable
@@ -74,6 +77,16 @@ public class Tubemq extends ConnectorDescriptor {
}
/**
+ * Sets the client role to be used.
+ *
+ * @param isConsumer The client role if consumer.
+ */
+ public Tubemq asConsumer(boolean isConsumer) {
+ this.consumerRole = isConsumer;
+ return this;
+ }
+
+ /**
* Sets the address of tubemq master to connect.
*
* @param master The address of tubemq master.
@@ -133,13 +146,14 @@ public class Tubemq extends ConnectorDescriptor {
if (master != null) {
descriptorProperties.putString(CONNECTOR_MASTER, master);
}
-
- if (group != null) {
- descriptorProperties.putString(CONNECTOR_GROUP, group);
- }
-
- if (tids != null) {
- descriptorProperties.putString(CONNECTOR_TIDS, tids);
+ if (consumerRole) {
+ if (group != null) {
+ descriptorProperties.putString(CONNECTOR_GROUP, group);
+ }
+
+ if (tids != null) {
+ descriptorProperties.putString(CONNECTOR_TIDS, tids);
+ }
}
descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES,
properties);