This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0e60e726aa [INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe
streamId (#9078)
0e60e726aa is described below
commit 0e60e726aa2531f63f6314e6c0430157859bc1db
Author: vernedeng <[email protected]>
AuthorDate: Fri Oct 20 17:29:28 2023 +0800
[INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId (#9078)
---
inlong-distribution/pom.xml | 2 +-
.../pojo/sort/node/provider/TubeMqProvider.java | 4 ++--
.../manager/pojo/source/tubemq/TubeMQSource.java | 6 +++---
.../source/tubemq/TubeMQSourceOperator.java | 2 +-
.../sort/protocol/constant/TubeMQConstant.java | 6 +++---
.../protocol/node/extract/TubeMQExtractNode.java | 24 +++++++++++-----------
inlong-sort/sort-core/pom.xml | 6 ++++++
.../tubemq/table/TubeMQDynamicTableFactory.java | 8 ++++----
.../inlong/sort/tubemq/table/TubeMQOptions.java | 19 +++++++++++------
.../sort-connectors/pulsar/pom.xml | 2 ++
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 14 ++++++-------
.../inlong/sort/tubemq/FlinkTubeMQProducer.java | 10 ++++-----
.../tubemq/table/TubeMQDynamicTableFactory.java | 4 ++--
.../inlong/sort/tubemq/table/TubeMQOptions.java | 8 ++++----
.../inlong/sort/tubemq/table/TubeMQTableSink.java | 14 ++++++-------
.../sort/tubemq/table/TubeMQTableSource.java | 18 ++++++++--------
inlong-tubemq/tubemq-client/pom.xml | 2 ++
inlong-tubemq/tubemq-server/pom.xml | 1 +
18 files changed, 84 insertions(+), 66 deletions(-)
diff --git a/inlong-distribution/pom.xml b/inlong-distribution/pom.xml
index 5107b1a0fa..14b4a30804 100644
--- a/inlong-distribution/pom.xml
+++ b/inlong-distribution/pom.xml
@@ -73,7 +73,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
- <version>3.1.0</version>
+ <version>${exec.maven.version}</version>
<configuration>
<executable>${basedir}/script/backup_module_dependencys.sh</executable>
</configuration>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 64b0c0a71f..daaf5a94fb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -53,9 +53,9 @@ public class TubeMqProvider implements ExtractNodeProvider {
source.getMasterRpc(),
source.getTopic(),
source.getSerializationType(),
- source.getGroupId(),
+ source.getConsumeGroup(),
source.getSessionKey(),
- source.getTid(),
+ source.getStreamId(),
source.getInnerFormat());
}
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index 0a0e49127f..a295b0329e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource {
private String topic;
@ApiModelProperty("Group of the TubeMQ")
- private String groupId;
+ private String consumeGroup;
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
@@ -61,10 +61,10 @@ public class TubeMQSource extends StreamSource {
private String innerFormat;
/**
- * The TubeMQ consumers use this tid set to filter records reading from
server.
+ * The TubeMQ consumers use this streamId set to filter records reading
from server.
*/
@ApiModelProperty("Tid of the TubeMQ")
- private TreeSet<String> tid;
+ private TreeSet<String> streamId;
public TubeMQSource() {
this.setSourceType(SourceType.TUBEMQ);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 7fc56539d9..df7af84b09 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -107,7 +107,7 @@ public class TubeMQSourceOperator extends
AbstractSourceOperator {
String streamId = streamInfo.getInlongStreamId();
tubeMQSource.setSourceName(streamId);
tubeMQSource.setTopic(groupInfo.getMqResource());
- tubeMQSource.setGroupId(streamId);
+ tubeMQSource.setConsumeGroup(streamId);
tubeMQSource.setMasterRpc(masterRpc);
tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
index 871e3eba41..eee84f1186 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/TubeMQConstant.java
@@ -24,7 +24,7 @@ public class TubeMQConstant {
public static final String TOPIC = "topic";
- public static final String GROUP_ID = "group.id";
+ public static final String CONSUME_GROUP = "consume.group";
public static final String CONNECTOR = "connector";
@@ -38,9 +38,9 @@ public class TubeMQConstant {
public static final String SESSION_KEY = "session.key";
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- public static final String TID = "tid";
+ public static final String STREAMID = "stream.id";
public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode";
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
index bbcdbbecd5..4c55adb3de 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java
@@ -61,17 +61,17 @@ public class TubeMQExtractNode extends ExtractNode
implements Serializable {
private String format;
@Nonnull
- @JsonProperty("groupId")
- private String groupId;
+ @JsonProperty("consumeGroup")
+ private String consumeGroup;
@JsonProperty("sessionKey")
private String sessionKey;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- @JsonProperty("tid")
- private TreeSet<String> tid;
+ @JsonProperty("streamId")
+ private TreeSet<String> streamId;
@JsonProperty("inlong-msg.inner.format")
private String innerFormat;
@@ -86,18 +86,18 @@ public class TubeMQExtractNode extends ExtractNode
implements Serializable {
@Nonnull @JsonProperty("masterRpc") String masterRpc,
@Nonnull @JsonProperty("topic") String topic,
@Nonnull @JsonProperty("format") String format,
- @Nonnull @JsonProperty("groupId") String groupId,
+ @Nonnull @JsonProperty("consumeGroup") String consumeGroup,
@JsonProperty("sessionKey") String sessionKey,
- @JsonProperty("tid") TreeSet<String> tid,
+ @JsonProperty("streamId") TreeSet<String> streamId,
@JsonProperty("inlong-msg.inner.format") String innerFormat) {
super(id, name, fields, waterMarkField, properties);
this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ
masterRpc is null");
this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
this.format = Preconditions.checkNotNull(format, "Format is null");
- this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
+ this.consumeGroup = Preconditions.checkNotNull(consumeGroup, "Group id
is null");
this.sessionKey = sessionKey;
+ this.streamId = streamId;
this.innerFormat = innerFormat;
- this.tid = tid;
}
@Override
@@ -106,15 +106,15 @@ public class TubeMQExtractNode extends ExtractNode
implements Serializable {
map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ);
map.put(TubeMQConstant.TOPIC, topic);
map.put(TubeMQConstant.MASTER_RPC, masterRpc);
- map.put(TubeMQConstant.GROUP_ID, groupId);
+ map.put(TubeMQConstant.CONSUME_GROUP, consumeGroup);
map.put(TubeMQConstant.FORMAT, format);
map.put(TubeMQConstant.SESSION_KEY, sessionKey);
if (format.startsWith(INLONG_MSG)) {
map.put(TubeMQConstant.INNER_FORMAT, innerFormat);
}
- if (null != tid && !tid.isEmpty()) {
- map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new
String[0]),
+ if (null != streamId && !streamId.isEmpty()) {
+ map.put(TubeMQConstant.STREAMID,
StringUtils.concatCsv(streamId.toArray(new String[0]),
',', null, null));
}
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 50b41d6833..c463785605 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -245,6 +245,12 @@
<profile>
<id>v1.15</id>
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-tubemq-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-postgres-cdc-v1.15</artifactId>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 7101a475a7..8aad010fba 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -44,11 +44,11 @@ import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_ID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
@@ -169,8 +169,8 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FORMAT);
options.add(TOPIC);
- options.add(GROUP_ID);
- options.add(TID);
+ options.add(CONSUME_GROUP);
+ options.add(STREAMID);
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index ba069094bb..357da2dd09 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -87,6 +87,13 @@ public class TubeMQOptions {
// TubeMQ specific options
//
--------------------------------------------------------------------------------------------
+ public static final ConfigOption<String> INNER_FORMAT =
+ ConfigOptions.key("inlong-msg.inner.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Inner format");
+
public static final ConfigOption<String> TOPIC =
ConfigOptions.key("topic")
.stringType()
@@ -109,8 +116,8 @@ public class TubeMQOptions {
.noDefaultValue()
.withDescription("Required TubeMQ master connection
string");
- public static final ConfigOption<String> GROUP_ID =
- ConfigOptions.key("group.id")
+ public static final ConfigOption<String> CONSUME_GROUP =
+ ConfigOptions.key("consume.group")
.stringType()
.noDefaultValue()
.withDescription(
@@ -140,8 +147,8 @@ public class TubeMQOptions {
.defaultValue("default_session_key")
.withDescription("The session key for this consumer group
at startup.");
- public static final ConfigOption<List<String>> TID =
- ConfigOptions.key("topic.tid")
+ public static final ConfigOption<List<String>> STREAMID =
+ ConfigOptions.key("stream.id")
.stringType()
.asList()
.noDefaultValue()
@@ -385,7 +392,7 @@ public class TubeMQOptions {
public static TreeSet<String> getTiSet(ReadableConfig tableOptions) {
TreeSet<String> set = new TreeSet<>();
- tableOptions.getOptional(TID).ifPresent(new Consumer<List<String>>() {
+ tableOptions.getOptional(STREAMID).ifPresent(new
Consumer<List<String>>() {
@Override
public void accept(List<String> strings) {
@@ -396,7 +403,7 @@ public class TubeMQOptions {
}
public static String getConsumerGroup(ReadableConfig tableOptions) {
- return tableOptions.getOptional(GROUP_ID).orElse(null);
+ return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
}
public static String getSessionKey(ReadableConfig tableOptions) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index e01c3278fa..501d39c637 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -130,6 +130,7 @@
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf.maven.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<!-- Currently Flink azure test pipeline would first
pre-compile and then upload the compiled
@@ -157,6 +158,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build.helper.maven.version}</version>
<executions>
<execution>
<id>add-test-source</id>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index b890561681..0180f90e02 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -80,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
private final String topic;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The consumer group name.
@@ -147,7 +147,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
*
* @param masterAddress the master address of TubeMQ
* @param topic the topic name
- * @param tidSet the topic's filter condition items
+ * @param streamIdSet the topic's filter condition items
* @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
* @param configuration the configure
@@ -156,7 +156,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
public FlinkTubeMQConsumer(
String masterAddress,
String topic,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
Configuration configuration,
@@ -164,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
Boolean innerFormat) {
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(topic, "The topic must not be null.");
- checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(consumerGroup, "The consumer group must not be null.");
checkNotNull(deserializationSchema, "The deserialization schema must
not be null.");
checkNotNull(configuration, "The configuration must not be null.");
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
this.sessionKey = sessionKey;
@@ -217,7 +217,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
- messagePullConsumer.subscribe(topic, tidSet);
+ messagePullConsumer.subscribe(topic, streamIdSet);
messagePullConsumer.completeSubscribe(sessionKey, numTasks, true,
currentOffsets);
running = true;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
index fb2f624961..3f04594b67 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
@@ -60,9 +60,9 @@ public class FlinkTubeMQProducer<T> extends
RichSinkFunction<T> implements Check
private final String topic;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The serializer for the records sent to tube.
*/
@@ -86,12 +86,12 @@ public class FlinkTubeMQProducer<T> extends
RichSinkFunction<T> implements Check
public FlinkTubeMQProducer(String topic,
String masterAddress,
SerializationSchema<T> serializationSchema,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
Configuration configuration) {
checkNotNull(topic, "The topic must not be null.");
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(serializationSchema, "The serialization schema must not
be null.");
- checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(configuration, "The configuration must not be null.");
int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
@@ -100,7 +100,7 @@ public class FlinkTubeMQProducer<T> extends
RichSinkFunction<T> implements Check
this.topic = topic;
this.masterAddress = masterAddress;
this.serializationSchema = serializationSchema;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.maxRetries = max_retries;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index f6130b5288..81b6709418 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -47,7 +47,7 @@ import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
@@ -220,7 +220,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FORMAT);
options.add(TOPIC);
- options.add(GROUP_NAME);
+ options.add(CONSUME_GROUP);
options.add(STREAMID);
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index 76f85f0563..0b4ea93978 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -106,8 +106,8 @@ public class TubeMQOptions {
.noDefaultValue()
.withDescription("Required TubeMQ master connection
string");
- public static final ConfigOption<String> GROUP_NAME =
- ConfigOptions.key("group.name")
+ public static final ConfigOption<String> CONSUME_GROUP =
+ ConfigOptions.key("consume.group")
.stringType()
.noDefaultValue()
.withDescription(
@@ -138,7 +138,7 @@ public class TubeMQOptions {
.withDescription("The session key for this consumer group
at startup.");
public static final ConfigOption<List<String>> STREAMID =
- ConfigOptions.key("topic.streamId")
+ ConfigOptions.key("stream.id")
.stringType()
.asList()
.noDefaultValue()
@@ -275,7 +275,7 @@ public class TubeMQOptions {
}
public static String getConsumerGroup(ReadableConfig tableOptions) {
- return tableOptions.getOptional(GROUP_NAME).orElse(null);
+ return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
}
public static String getSessionKey(ReadableConfig tableOptions) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
index 5d2f8c2a4d..cf5109d82f 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
@@ -50,9 +50,9 @@ public class TubeMQTableSink implements DynamicTableSink {
*/
private final String masterAddress;
/**
- * The TubeMQ tid filter collection.
+ * The TubeMQ streamId filter collection.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The parameters collection for tubemq producer.
*/
@@ -63,20 +63,20 @@ public class TubeMQTableSink implements DynamicTableSink {
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
String topic,
String masterAddress,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
Configuration configuration) {
Preconditions.checkNotNull(valueEncodingFormat, "The serialization
schema must not be null.");
Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
Preconditions.checkNotNull(topic, "Topic must not be null.");
Preconditions.checkNotNull(masterAddress, "Master address must not be
null.");
Preconditions.checkNotNull(configuration, "The configuration must not
be null.");
- Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(streamIdSet, "The streamId set must not be
null.");
this.valueEncodingFormat = valueEncodingFormat;
this.physicalDataType = physicalDataType;
this.topic = topic;
this.masterAddress = masterAddress;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.configuration = configuration;
}
@@ -102,7 +102,7 @@ public class TubeMQTableSink implements DynamicTableSink {
SerializationSchema<RowData> serializationSchema,
Configuration configuration) {
final FlinkTubeMQProducer<RowData> tubeMQProducer =
- new FlinkTubeMQProducer(topic, masterAddress,
serializationSchema, tidSet, configuration);
+ new FlinkTubeMQProducer(topic, masterAddress,
serializationSchema, streamIdSet, configuration);
return tubeMQProducer;
}
@@ -120,7 +120,7 @@ public class TubeMQTableSink implements DynamicTableSink {
valueEncodingFormat,
topic,
masterAddress,
- tidSet,
+ streamIdSet,
configuration);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 2dc21ca2e8..e79685ff70 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -84,9 +84,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
*/
private final String topic;
/**
- * The TubeMQ tid filter collection.
+ * The TubeMQ streamId filter collection.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The TubeMQ consumer group name.
*/
@@ -129,7 +129,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
public TubeMQTableSource(DataType physicalDataType,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
String masterAddress, String topic,
- TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+ TreeSet<String> streamIdSet, String consumerGroup, String
sessionKey,
Configuration configuration, @Nullable WatermarkStrategy<RowData>
watermarkStrategy,
Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat) {
@@ -137,7 +137,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
Preconditions.checkNotNull(valueDecodingFormat, "The deserialization
schema must not be null.");
Preconditions.checkNotNull(masterAddress, "The master address must not
be null.");
Preconditions.checkNotNull(topic, "The topic must not be null.");
- Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(streamIdSet, "The streamId set must not be
null.");
Preconditions.checkNotNull(consumerGroup, "The consumer group must not
be null.");
Preconditions.checkNotNull(configuration, "The configuration must not
be null.");
@@ -147,7 +147,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
this.valueDecodingFormat = valueDecodingFormat;
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.sessionKey = sessionKey;
this.configuration = configuration;
@@ -182,7 +182,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
public DynamicTableSource copy() {
return new TubeMQTableSource(
physicalDataType, valueDecodingFormat, masterAddress,
- topic, tidSet, consumerGroup, sessionKey, configuration,
+ topic, streamIdSet, consumerGroup, sessionKey, configuration,
watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat);
}
@@ -247,7 +247,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
&& Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
&& Objects.equals(masterAddress, that.masterAddress)
&& Objects.equals(topic, that.topic)
- && Objects.equals(String.valueOf(tidSet),
String.valueOf(that.tidSet))
+ && Objects.equals(String.valueOf(streamIdSet),
String.valueOf(that.streamIdSet))
&& Objects.equals(consumerGroup, that.consumerGroup)
&& Objects.equals(proctimeAttribute, that.proctimeAttribute)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -260,7 +260,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
valueDecodingFormat,
masterAddress,
topic,
- tidSet,
+ streamIdSet,
consumerGroup,
configuration,
watermarkStrategy,
@@ -302,7 +302,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
final DeserializationSchema<RowData> tubeMQDeserializer = new
DynamicTubeMQDeserializationSchema(
deserialization, metadataConverters, producedTypeInfo,
ignoreErrors);
- final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+ final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);
return tubeMQConsumer;
}
diff --git a/inlong-tubemq/tubemq-client/pom.xml
b/inlong-tubemq/tubemq-client/pom.xml
index b6055071d0..cff6081658 100644
--- a/inlong-tubemq/tubemq-client/pom.xml
+++ b/inlong-tubemq/tubemq-client/pom.xml
@@ -94,6 +94,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
+ <version>${exec.maven.version}</version>
<executions>
<execution>
<id>version</id>
@@ -114,6 +115,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build.helper.maven.version}</version>
<executions>
<execution>
<goals>
diff --git a/inlong-tubemq/tubemq-server/pom.xml
b/inlong-tubemq/tubemq-server/pom.xml
index 6c463a758e..0d2136e0cd 100644
--- a/inlong-tubemq/tubemq-server/pom.xml
+++ b/inlong-tubemq/tubemq-server/pom.xml
@@ -237,6 +237,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
+ <version>${exec.maven.version}</version>
<executions>
<execution>
<id>version</id>