This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 12e1a46dc4 [INLONG-8922][Sort] Add TubeMQ source and sink connector on
flink 1.15 (#9031)
12e1a46dc4 is described below
commit 12e1a46dc43d74acc489798cdd1c6b96b84a2044
Author: Zfancy <[email protected]>
AuthorDate: Mon Oct 16 15:58:08 2023 +0800
[INLONG-8922][Sort] Add TubeMQ source and sink connector on flink 1.15
(#9031)
---
.../src/main/assemblies/sort-connectors-v1.15.xml | 8 +
.../sort-flink-v1.15/sort-connectors/pom.xml | 1 +
.../sort-connectors/tubemq/pom.xml | 107 +++++++
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 351 +++++++++++++++++++++
.../inlong/sort/tubemq/FlinkTubeMQProducer.java | 175 ++++++++++
.../table/DynamicTubeMQDeserializationSchema.java | 134 ++++++++
.../table/DynamicTubeMQSerializationSchema.java | 35 ++
.../tubemq/table/TubeMQDynamicTableFactory.java | 233 ++++++++++++++
.../inlong/sort/tubemq/table/TubeMQOptions.java | 289 +++++++++++++++++
.../inlong/sort/tubemq/table/TubeMQTableSink.java | 131 ++++++++
.../sort/tubemq/table/TubeMQTableSource.java | 341 ++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
12 files changed, 1821 insertions(+)
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
index 90dfa48893..d71d98dbfa 100644
--- a/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.15.xml
@@ -83,5 +83,13 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-tubemq-v1.15-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
</fileSets>
</assembly>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index 82a4b2949e..151ed4757b 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -39,6 +39,7 @@
<module>iceberg</module>
<module>mongodb-cdc</module>
<module>pulsar</module>
+ <module>tubemq</module>
</modules>
<properties>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
new file mode 100644
index 0000000000..ba5cce64a9
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.15</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-tubemq-v1.15</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-tubemq</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${plugin.shade.version}</version>
+
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+
+ <configuration>
+
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+ <include>com.fasterxml.*:*</include>
+ </includes>
+ </artifactSet>
+
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.inlong.sort.base</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
new file mode 100644
index 0000000000..b890561681
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.parseDuration;
+
+/**
+ * The Flink TubeMQ Consumer.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
+ implements
+ CheckpointedFunction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
+ private static final String TUBE_OFFSET_STATE = "tube-offset-state";
+
+ /**
+ * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+ */
+ private final String masterAddress;
+
+ /**
+ * The topic name.
+ */
+ private final String topic;
+
+ /**
+ * The tubemq consumers use this tid set to filter records reading from
server.
+ */
+ private final TreeSet<String> tidSet;
+
+ /**
+ * The consumer group name.
+ */
+ private final String consumerGroup;
+
+ /**
+ * The deserializer for records.
+ */
+ private final DeserializationSchema<T> deserializationSchema;
+
+ /**
+ * The random key for TubeMQ consumer group when startup.
+ */
+ private final String sessionKey;
+
+ /**
+ * True if consuming message from max offset.
+ */
+ private final boolean consumeFromMax;
+
+ /**
+ * The time to wait if tubemq broker returns message not found.
+ */
+ private final Duration messageNotFoundWaitPeriod;
+
+ /**
+ * The max time to marked source idle.
+ */
+ private final Duration maxIdleTime;
+ /**
+ * The InLong inner format.
+ */
+ private final boolean innerFormat;
+ /**
+ * Flag indicating whether the consumer is still running.
+ **/
+ private volatile boolean running;
+ /**
+ * The state for the offsets of queues.
+ */
+ private transient ListState<Tuple2<String, Long>> offsetsState;
+ /**
+ * The current offsets of partitions which are stored in {@link
#offsetsState}
+ * once a checkpoint is triggered.
+ * <p>
+ * NOTE: The offsets are populated in the main thread and saved in the
+ * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
+ */
+ private transient Map<String, Long> currentOffsets;
+ /**
+ * The TubeMQ session factory.
+ */
+ private transient TubeSingleSessionFactory messageSessionFactory;
+ /**
+ * The TubeMQ pull consumer.
+ */
+ private transient PullMessageConsumer messagePullConsumer;
+
+ /**
+ * Build a TubeMQ source function
+ *
+ * @param masterAddress the master address of TubeMQ
+ * @param topic the topic name
+ * @param tidSet the topic's filter condition items
+ * @param consumerGroup the consumer group name
+ * @param deserializationSchema the deserialize schema
+ * @param configuration the configure
+ * @param sessionKey the tube session key
+ */
+ public FlinkTubeMQConsumer(
+ String masterAddress,
+ String topic,
+ TreeSet<String> tidSet,
+ String consumerGroup,
+ DeserializationSchema<T> deserializationSchema,
+ Configuration configuration,
+ String sessionKey,
+ Boolean innerFormat) {
+ checkNotNull(masterAddress, "The master address must not be null.");
+ checkNotNull(topic, "The topic must not be null.");
+ checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(consumerGroup, "The consumer group must not be null.");
+ checkNotNull(deserializationSchema, "The deserialization schema must
not be null.");
+ checkNotNull(configuration, "The configuration must not be null.");
+
+ this.masterAddress = masterAddress;
+ this.topic = topic;
+ this.tidSet = tidSet;
+ this.consumerGroup = consumerGroup;
+ this.deserializationSchema = deserializationSchema;
+ this.sessionKey = sessionKey;
+
+ // those param set default
+ this.consumeFromMax =
configuration.getBoolean(TubeMQOptions.BOOTSTRAP_FROM_MAX);
+ this.messageNotFoundWaitPeriod = parseDuration(configuration.getString(
+ TubeMQOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
+ this.maxIdleTime = parseDuration(configuration.getString(
+ TubeMQOptions.SOURCE_MAX_IDLE_TIME));
+ this.innerFormat = innerFormat;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ TypeInformation<Tuple2<String, Long>> typeInformation =
+ new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
+ ListStateDescriptor<Tuple2<String, Long>> stateDescriptor =
+ new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
+
+ OperatorStateStore stateStore = context.getOperatorStateStore();
+ offsetsState = stateStore.getListState(stateDescriptor);
+ currentOffsets = new HashMap<>();
+ if (context.isRestored()) {
+ for (Tuple2<String, Long> tubeOffset : offsetsState.get()) {
+ currentOffsets.put(tubeOffset.f0, tubeOffset.f1);
+ }
+ LOG.info("Successfully restore the offsets {}.", currentOffsets);
+ } else {
+ LOG.info("No restore offsets.");
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
+ consumerConfig.setConsumePosition(consumeFromMax
+ ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+ : ConsumePosition.CONSUMER_FROM_FIRST_OFFSET);
+
+
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
+
+ final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
+ messagePullConsumer.subscribe(topic, tidSet);
+ messagePullConsumer.completeSubscribe(sessionKey, numTasks, true,
currentOffsets);
+
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext<T> ctx) throws Exception {
+
+ Instant lastConsumeInstant = Instant.now();
+
+ while (running) {
+ ConsumerResult consumeResult = messagePullConsumer.getMessage();
+ if (!consumeResult.isSuccess()) {
+ if (!(consumeResult.getErrCode() ==
TErrCodeConstants.BAD_REQUEST
+ || consumeResult.getErrCode() ==
TErrCodeConstants.NOT_FOUND
+ || consumeResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_FROZEN
+ || consumeResult.getErrCode() ==
TErrCodeConstants.NO_PARTITION_ASSIGNED
+ || consumeResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_WAITING
+ || consumeResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_INUSE)) {
+ LOG.info("Could not consume messages from tubemq (errcode:
{},errmsg: {}).",
+ consumeResult.getErrCode(),
+ consumeResult.getErrMsg());
+ }
+
+ Duration idleTime = Duration.between(lastConsumeInstant,
Instant.now());
+ if (idleTime.compareTo(maxIdleTime) > 0) {
+ ctx.markAsTemporarilyIdle();
+ }
+
+ continue;
+ }
+
+ List<Message> messageList = consumeResult.getMessageList();
+ lastConsumeInstant = Instant.now();
+
+ List<T> records = new ArrayList<>();
+ lastConsumeInstant = getRecords(lastConsumeInstant, messageList,
records);
+
+ synchronized (ctx.getCheckpointLock()) {
+
+ for (T record : records) {
+ ctx.collect(record);
+ }
+ currentOffsets.put(
+ consumeResult.getPartitionKey(),
+ consumeResult.getCurrOffset());
+ }
+
+ ConsumerResult confirmResult = messagePullConsumer
+ .confirmConsume(consumeResult.getConfirmContext(), true);
+ if (!confirmResult.isSuccess()) {
+ if (!(confirmResult.getErrCode() ==
TErrCodeConstants.BAD_REQUEST
+ || confirmResult.getErrCode() ==
TErrCodeConstants.NOT_FOUND
+ || confirmResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_FROZEN
+ || confirmResult.getErrCode() ==
TErrCodeConstants.NO_PARTITION_ASSIGNED
+ || confirmResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_WAITING
+ || confirmResult.getErrCode() ==
TErrCodeConstants.ALL_PARTITION_INUSE)) {
+ LOG.warn("Could not confirm messages to tubemq (errcode:
{},errmsg: {}).",
+ confirmResult.getErrCode(),
+ confirmResult.getErrMsg());
+ }
+ }
+ }
+ }
+
+ private Instant getRecords(Instant lastConsumeInstant, List<Message>
messageList, List<T> records)
+ throws Exception {
+ if (messageList != null) {
+ lastConsumeInstant = Instant.now();
+ if (!innerFormat) {
+ for (Message message : messageList) {
+ T record =
deserializationSchema.deserialize(message.getData());
+ records.add(record);
+ }
+ } else {
+ List<RowData> rowDataList = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(rowDataList);
+ for (Message message : messageList) {
+ deserializationSchema.deserialize(message.getData(),
(Collector<T>) out);
+ }
+ rowDataList.forEach(data -> records.add((T) data));
+ }
+ }
+
+ return lastConsumeInstant;
+
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+
+ offsetsState.clear();
+ for (Map.Entry<String, Long> entry : currentOffsets.entrySet()) {
+ offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+ }
+
+ LOG.info("Successfully save the offsets in checkpoint {}: {}.",
+ context.getCheckpointId(), currentOffsets);
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ cancel();
+
+ if (messagePullConsumer != null) {
+ try {
+ messagePullConsumer.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly shutdown the tubemq pull
consumer.", t);
+ }
+ }
+
+ if (messageSessionFactory != null) {
+ try {
+ messageSessionFactory.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Could not properly shutdown the tubemq session
factory.", t);
+ }
+ }
+
+ super.close();
+
+ LOG.info("Closed the tubemq source.");
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
new file mode 100644
index 0000000000..fb2f624961
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQProducer.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.client.producer.MessageProducer;
+import org.apache.inlong.tubemq.client.producer.MessageSentResult;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.TreeSet;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements
CheckpointedFunction {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkTubeMQProducer.class);
+
+ private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
+
+ /**
+ * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
+ */
+ private final String masterAddress;
+
+ /**
+ * The topic name.
+ */
+ private final String topic;
+
+ /**
+ * The tubemq consumers use this tid set to filter records reading from
server.
+ */
+ private final TreeSet<String> tidSet;
+ /**
+ * The serializer for the records sent to tube.
+ */
+ private final SerializationSchema<T> serializationSchema;
+
+ /**
+ * The tubemq producer.
+ */
+ private transient MessageProducer producer;
+
+ /**
+ * The tubemq session factory.
+ */
+ private transient MessageSessionFactory sessionFactory;
+
+ /**
+ * The maximum number of retries.
+ */
+ private final int maxRetries;
+
+ public FlinkTubeMQProducer(String topic,
+ String masterAddress,
+ SerializationSchema<T> serializationSchema,
+ TreeSet<String> tidSet,
+ Configuration configuration) {
+ checkNotNull(topic, "The topic must not be null.");
+ checkNotNull(masterAddress, "The master address must not be null.");
+ checkNotNull(serializationSchema, "The serialization schema must not
be null.");
+ checkNotNull(tidSet, "The tid set must not be null.");
+ checkNotNull(configuration, "The configuration must not be null.");
+
+ int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
+ checkArgument(max_retries > 0);
+
+ this.topic = topic;
+ this.masterAddress = masterAddress;
+ this.serializationSchema = serializationSchema;
+ this.tidSet = tidSet;
+ this.maxRetries = max_retries;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
{
+ // Nothing to do.
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext
functionInitializationContext) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ TubeClientConfig tubeClientConfig = new
TubeClientConfig(masterAddress);
+ this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
+ this.producer = sessionFactory.createProducer();
+ HashSet<String> hashSet = new HashSet<>();
+ hashSet.add(topic);
+ producer.publish(hashSet);
+ }
+
+ @Override
+ public void invoke(T in, SinkFunction.Context context) throws Exception {
+
+ int retries = 0;
+ Exception exception = null;
+
+ while (maxRetries <= 0 || retries < maxRetries) {
+
+ try {
+ byte[] body = serializationSchema.serialize(in);
+ Message message = new Message(topic, body);
+ MessageSentResult sendResult = producer.sendMessage(message);
+ if (sendResult.isSuccess()) {
+ return;
+ } else {
+ LOG.warn("Send msg fail, error code: {}, error message:
{}",
+ sendResult.getErrCode(), sendResult.getErrMsg());
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not properly send the message to tube "
+ + "(retries: {}).", retries, e);
+
+ retries++;
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ throw new IOException("Could not properly send the message to tube.",
exception);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ try {
+ if (producer != null) {
+ producer.shutdown();
+ }
+ if (sessionFactory != null) {
+ sessionFactory.shutdown();
+ }
+ } catch (Throwable e) {
+ LOG.error("Shutdown producer error", e);
+ } finally {
+ super.close();
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
new file mode 100644
index 0000000000..f5880f1a78
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.tubemq.corebase.Message;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class DynamicTubeMQDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ /**
+ * data buffer message
+ */
+ private final DeserializationSchema<RowData> deserializationSchema;
+
+ /**
+ * {@link MetadataConverter} of how to produce metadata from message.
+ */
+ private final MetadataConverter[] metadataConverters;
+
+ /**
+ * {@link TypeInformation} of the produced {@link RowData} (physical +
meta data).
+ */
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ /**
+ * status of error
+ */
+ private final boolean ignoreErrors;
+
+ public DynamicTubeMQDeserializationSchema(
+ DeserializationSchema<RowData> schema,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean ignoreErrors) {
+ this.deserializationSchema = schema;
+ this.metadataConverters = metadataConverters;
+ this.producedTypeInfo = producedTypeInfo;
+ this.ignoreErrors = ignoreErrors;
+ }
+
+ @Override
+ public RowData deserialize(byte[] bytes) throws IOException {
+ return deserializationSchema.deserialize(bytes);
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
+ deserializationSchema.deserialize(message, out);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData rowData) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DynamicTubeMQDeserializationSchema)) {
+ return false;
+ }
+ DynamicTubeMQDeserializationSchema that =
(DynamicTubeMQDeserializationSchema) o;
+ return ignoreErrors == that.ignoreErrors
+ &&
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+ && Objects.equal(deserializationSchema,
that.deserializationSchema)
+ && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(deserializationSchema, metadataConverters,
producedTypeInfo, ignoreErrors);
+ }
+
+ /**
+ * add metadata column
+ */
+ private void emitRow(Message head, GenericRowData physicalRow,
Collector<RowData> out) {
+ if (metadataConverters.length == 0) {
+ out.collect(physicalRow);
+ return;
+ }
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity +
metadataArity);
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos,
physicalRow.getField(physicalPos));
+ }
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos,
metadataConverters[metadataPos].read(head));
+ }
+ out.collect(producedRow);
+ }
+
+ interface MetadataConverter extends Serializable {
+
+ Object read(Message head);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
new file mode 100644
index 0000000000..a1f95fcdd6
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQSerializationSchema.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.data.RowData;
+
+public class DynamicTubeMQSerializationSchema implements
SerializationSchema<RowData> {
+
+ private final SerializationSchema<RowData> serializationSchema;
+
+ public DynamicTubeMQSerializationSchema(SerializationSchema<RowData>
serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public byte[] serialize(RowData element) {
+ return serializationSchema.serialize(element);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
new file mode 100644
index 0000000000..17275d8d11
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
+import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
+
+/**
+ * A dynamic table factory implementation for TubeMQ.
+ */
+public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "tubemq";
+
+ public static final List<String> INNERFORMATTYPE =
Arrays.asList("inlong-msg");
+
+ public static boolean innerFormat = false;
+
+ private static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ return
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class,
FORMAT)
+ .orElseGet(() ->
helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT));
+ }
+
+ private static EncodingFormat<SerializationSchema<RowData>>
getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT)
+ .orElseGet(() ->
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT));
+ }
+
+ private static void validatePKConstraints(
+ ObjectIdentifier tableName, CatalogTable catalogTable, Format
format) {
+ if (catalogTable.getSchema().getPrimaryKey().isPresent()
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ Configuration options =
Configuration.fromMap(catalogTable.getOptions());
+ String formatName =
options.getOptional(FORMAT).orElse(options.get(FORMAT));
+ innerFormat = INNERFORMATTYPE.contains(formatName);
+ throw new ValidationException(String.format(
+ "The TubeMQ table '%s' with '%s' format doesn't support
defining PRIMARY KEY constraint"
+ + " on the table, because it can't guarantee the
semantic of primary key.",
+ tableName.asSummaryString(), formatName));
+ }
+ }
+
+ private static Optional<DecodingFormat<DeserializationSchema<RowData>>>
getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat = helper
+
.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+ keyDecodingFormat.ifPresent(format -> {
+ if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(String.format(
+ "A key format should only deal with INSERT-only
records. "
+ + "But %s has a changelog mode of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyDecodingFormat;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat = getValueDecodingFormat(helper);
+
+ // validate all options
+ helper.validate();
+
+ validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
+
+ final Configuration properties =
getTubeMQProperties(context.getCatalogTable().getOptions());
+
+ final DataType physicalDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ return createTubeMQTableSource(
+ physicalDataType,
+ valueDecodingFormat,
+ TubeMQOptions.getSourceTopics(tableOptions),
+ TubeMQOptions.getMasterRpcAddress(tableOptions),
+ TubeMQOptions.getTiSet(tableOptions),
+ TubeMQOptions.getConsumerGroup(tableOptions),
+ TubeMQOptions.getSessionKey(tableOptions),
+ properties);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
= getValueEncodingFormat(helper);
+
+ // validate all options
+ helper.validate();
+
+ validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueEncodingFormat);
+
+ final Configuration properties =
getTubeMQProperties(context.getCatalogTable().getOptions());
+
+ final DataType physicalDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
+
+ return createTubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ TubeMQOptions.getSinkTopics(tableOptions),
+ TubeMQOptions.getMasterRpcAddress(tableOptions),
+ TubeMQOptions.getTiSet(tableOptions),
+ properties);
+ }
+
+ protected TubeMQTableSource createTubeMQTableSource(
+ DataType physicalDataType,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ String topic,
+ String url,
+ TreeSet<String> streamId,
+ String consumerGroup,
+ String sessionKey,
+ Configuration properties) {
+ return new TubeMQTableSource(
+ physicalDataType,
+ valueDecodingFormat,
+ url,
+ topic,
+ streamId,
+ consumerGroup,
+ sessionKey,
+ properties,
+ null,
+ null,
+ false,
+ innerFormat);
+ }
+
+ protected TubeMQTableSink createTubeMQTableSink(
+ DataType physicalDataType,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ String topic,
+ String masterAddress,
+ TreeSet<String> streamId,
+ Configuration configuration) {
+ return new TubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ topic,
+ masterAddress,
+ streamId,
+ configuration);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(MASTER_RPC);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FORMAT);
+ options.add(TOPIC);
+ options.add(GROUP_NAME);
+ options.add(STREAMID);
+ options.add(SESSION_KEY);
+ options.add(BOOTSTRAP_FROM_MAX);
+ options.add(TOPIC_PATTERN);
+ return options;
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
new file mode 100644
index 0000000000..0085100c87
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.Description;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/**
+ * Option utils for tubeMQ table source and sink.
+ */
+public class TubeMQOptions {
+
+ //
--------------------------------------------------------------------------------------------
+ // Option enumerations
+ //
--------------------------------------------------------------------------------------------
+
+ public static final String PROPERTIES_PREFIX = "properties.";
+
+ // Start up offset.
+ // Always start from the max consume position.
+ public static final String CONSUMER_FROM_MAX_OFFSET_ALWAYS = "max";
+ // Start from the latest position for the first time. Otherwise start from
last consume position.
+ public static final String CONSUMER_FROM_LATEST_OFFSET = "latest";
+ // Start from 0 for the first time. Otherwise start from last consume
position.
+ public static final String CONSUMER_FROM_FIRST_OFFSET = "earliest";
+
+ //
--------------------------------------------------------------------------------------------
+ // Format options
+ //
--------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+ .key("key." + FORMAT.key())
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Defines the format identifier for encoding key
data. "
+ + "The identifier is used to discover a suitable format
factory.");
+
+ public static final ConfigOption<List<String>> KEY_FIELDS =
+ ConfigOptions.key("key.fields")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "Defines an explicit list of physical columns from
the table schema "
+ + "that configure the data type for the
key format. By default, this list is "
+ + "empty and thus a key is undefined.");
+
+ //
--------------------------------------------------------------------------------------------
+ // TubeMQ specific options
+ //
--------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<String> TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Topic names from which the table is read. Either
'topic' "
+ + "or 'topic-pattern' must be set for
source.");
+
+ public static final ConfigOption<String> TOPIC_PATTERN =
+ ConfigOptions.key("topic-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional topic pattern from which the table is
read for source."
+ + " Either 'topic' or 'topic-pattern' must
be set.");
+
+ public static final ConfigOption<String> MASTER_RPC =
+ ConfigOptions.key("master.rpc")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required TubeMQ master connection
string");
+
+ public static final ConfigOption<String> GROUP_NAME =
+ ConfigOptions.key("group.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Required consumer group in TubeMQ consumer");
+
+ public static final ConfigOption<String>
TUBE_MESSAGE_NOT_FOUND_WAIT_PERIOD =
+ ConfigOptions.key("tubemq.message.not.found.wait.period")
+ .stringType()
+ .defaultValue("350ms")
+ .withDescription("The time of waiting period if "
+ + "tubeMQ broker return message not found.");
+
+ public static final ConfigOption<Long> TUBE_SUBSCRIBE_RETRY_TIMEOUT =
+ ConfigOptions.key("tubemq.subscribe.retry.timeout")
+ .longType()
+ .defaultValue(300000L)
+ .withDescription("The time of subscribing tubeMQ timeout,
in millisecond");
+
+ public static final ConfigOption<Integer> SOURCE_EVENT_QUEUE_CAPACITY =
+ ConfigOptions.key("source.event.queue.capacity")
+ .intType()
+ .defaultValue(1024);
+
+ public static final ConfigOption<String> SESSION_KEY =
+ ConfigOptions.key("session.key")
+ .stringType()
+ .defaultValue("default_session_key")
+ .withDescription("The session key for this consumer group
at startup.");
+
+ public static final ConfigOption<List<String>> STREAMID =
+ ConfigOptions.key("topic.streamId")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("The streamId owned this topic.");
+
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max.retries")
+ .intType()
+ .defaultValue(5)
+ .withDescription("The maximum number of retries when an "
+ + "exception is caught.");
+
+ public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX =
+ ConfigOptions.key("bootstrap.from.max")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("True if consuming from the most recent "
+ + "position when the tubemq source starts.. It
only takes "
+ + "effect when the tubemq source does not recover
from "
+ + "checkpoints.");
+
+ public static final ConfigOption<String> SOURCE_MAX_IDLE_TIME =
+ ConfigOptions.key("source.task.max.idle.time")
+ .stringType()
+ .defaultValue("5min")
+ .withDescription("The max time of the source marked as
temporarily idle.");
+
+ public static final ConfigOption<String> MESSAGE_NOT_FOUND_WAIT_PERIOD =
+ ConfigOptions.key("message.not.found.wait.period")
+ .stringType()
+ .defaultValue("500ms")
+ .withDescription("The time of waiting period if tubemq
broker return message not found.");
+
+ public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE
=
+ ConfigOptions.key("value.fields-include")
+ .enumType(ValueFieldsStrategy.class)
+ .defaultValue(ValueFieldsStrategy.ALL)
+ .withDescription(
+ String.format(
+ "Defines a strategy how to deal with key
columns in the data type "
+ + "of the value format. By
default, '%s' physical columns "
+ + "of the table schema will be
included in the value "
+ + "format which means that the key
columns "
+ + "appear in the data type for
both the key and value format.",
+ ValueFieldsStrategy.ALL));
+
+ public static final ConfigOption<String> KEY_FIELDS_PREFIX =
+ ConfigOptions.key("key.fields-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Defines a custom prefix for all
fields of the key format to avoid "
+ + "name clashes with
fields of the value format. "
+ + "By default, the prefix
is empty.")
+ .linebreak()
+ .text(
+ String.format(
+ "If a custom prefix is
defined, both the table schema "
+ + "and '%s' will
work with prefixed names.",
+ KEY_FIELDS.key()))
+ .linebreak()
+ .text(
+ "When constructing the data type
of the key format, "
+ + "the prefix will be
removed and the "
+ + "non-prefixed names will
be used within the key format.")
+ .linebreak()
+ .text(
+ String.format(
+ "Please note that this
option requires that '%s' must be '%s'.",
+ VALUE_FIELDS_INCLUDE.key(),
+
ValueFieldsStrategy.EXCEPT_KEY))
+ .build());
+
+ //
--------------------------------------------------------------------------------------------
+ // Validation
+ //
--------------------------------------------------------------------------------------------
+ private static final Set<String> CONSUMER_STARTUP_MODE_ENUMS = new
HashSet<>(Arrays.asList(
+ CONSUMER_FROM_MAX_OFFSET_ALWAYS,
+ CONSUMER_FROM_LATEST_OFFSET,
+ CONSUMER_FROM_FIRST_OFFSET));
+
+ public static Configuration getTubeMQProperties(Map<String, String>
tableOptions) {
+ final Configuration tubeMQProperties = new Configuration();
+
+ if (hasTubeMQClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey =
key.substring((PROPERTIES_PREFIX).length());
+ tubeMQProperties.toMap().put(subKey, value);
+ });
+ }
+ return tubeMQProperties;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Scan specific options
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Decides if the table options contains TubeMQ client properties that
start with prefix
+ * 'properties'.
+ */
+ private static boolean hasTubeMQClientProperties(Map<String, String>
tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k ->
k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ public static String getSourceTopics(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
+ }
+
+ public static String getSinkTopics(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
+ }
+
+ public static String getMasterRpcAddress(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(MASTER_RPC).orElse(null);
+ }
+
+ public static TreeSet<String> getTiSet(ReadableConfig tableOptions) {
+ TreeSet<String> set = new TreeSet<>();
+ tableOptions.getOptional(STREAMID).ifPresent(new
Consumer<List<String>>() {
+
+ @Override
+ public void accept(List<String> strings) {
+ set.addAll(strings);
+ }
+ });
+ return set;
+ }
+
+ public static String getConsumerGroup(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(GROUP_NAME).orElse(null);
+ }
+
+ public static String getSessionKey(ReadableConfig tableOptions) {
+ return
tableOptions.getOptional(SESSION_KEY).orElse(SESSION_KEY.defaultValue());
+ }
+
+ /**
+ * Strategies to derive the data type of a value format by considering a
key format.
+ */
+ public enum ValueFieldsStrategy {
+
+ ALL,
+
+ EXCEPT_KEY
+
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
new file mode 100644
index 0000000000..5d2f8c2a4d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSink.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQProducer;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.TreeSet;
+
+public class TubeMQTableSink implements DynamicTableSink {
+
+ /**
+ * Format for encoding values from TubeMQ.
+ */
+ private final EncodingFormat<SerializationSchema<RowData>>
valueEncodingFormat;
+ /**
+ * Data type to configure the formats.
+ */
+ private final DataType physicalDataType;
+ /**
+ * The TubeMQ topic name.
+ */
+ private final String topic;
+ /**
+ * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+ */
+ private final String masterAddress;
+ /**
+ * The TubeMQ tid filter collection.
+ */
+ private final TreeSet<String> tidSet;
+ /**
+ * The parameters collection for tubemq producer.
+ */
+ private final Configuration configuration;
+
+ public TubeMQTableSink(
+ DataType physicalDataType,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ String topic,
+ String masterAddress,
+ TreeSet<String> tidSet,
+ Configuration configuration) {
+ Preconditions.checkNotNull(valueEncodingFormat, "The serialization
schema must not be null.");
+ Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
+ Preconditions.checkNotNull(topic, "Topic must not be null.");
+ Preconditions.checkNotNull(masterAddress, "Master address must not be
null.");
+ Preconditions.checkNotNull(configuration, "The configuration must not
be null.");
+ Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+
+ this.valueEncodingFormat = valueEncodingFormat;
+ this.physicalDataType = physicalDataType;
+ this.topic = topic;
+ this.masterAddress = masterAddress;
+ this.tidSet = tidSet;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return valueEncodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final SerializationSchema<RowData> serialization =
createSerialization(context,
+ valueEncodingFormat, physicalDataType);
+
+ final FlinkTubeMQProducer<RowData> tubeMQProducer =
+ createTubeMQProducer(topic, masterAddress, serialization,
configuration);
+
+ return SinkFunctionProvider.of(tubeMQProducer, 1);
+ }
+
+ private FlinkTubeMQProducer<RowData> createTubeMQProducer(
+ String topic,
+ String masterAddress,
+ SerializationSchema<RowData> serializationSchema,
+ Configuration configuration) {
+ final FlinkTubeMQProducer<RowData> tubeMQProducer =
+ new FlinkTubeMQProducer(topic, masterAddress,
serializationSchema, tidSet, configuration);
+ return tubeMQProducer;
+ }
+
+ private SerializationSchema<RowData> createSerialization(
+ Context context,
+ EncodingFormat<SerializationSchema<RowData>> format,
+ DataType physicalDataType) {
+ return format.createRuntimeEncoder(context, physicalDataType);
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new TubeMQTableSink(
+ physicalDataType,
+ valueEncodingFormat,
+ topic,
+ masterAddress,
+ tidSet,
+ configuration);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TubeMQ table sink";
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
new file mode 100644
index 0000000000..2dc21ca2e8
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq.table;
+
+import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
+import
org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter;
+import org.apache.inlong.tubemq.corebase.Message;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * .
+ */
+public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+ /**
+ * Data type to configure the formats.
+ */
+ private final DataType physicalDataType;
+ /**
+ * Format for decoding values from TubeMQ.
+ */
+ private final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat;
+
+ // -------------------------------------------------------------------
+ /**
+ * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+ */
+ private final String masterAddress;
+ /**
+ * The TubeMQ topic name.
+ */
+ private final String topic;
+ /**
+ * The TubeMQ tid filter collection.
+ */
+ private final TreeSet<String> tidSet;
+ /**
+ * The TubeMQ consumer group name.
+ */
+ private final String consumerGroup;
+ /**
+ * The parameters collection for TubeMQ consumer.
+ */
+ private final Configuration configuration;
+ /**
+ * The TubeMQ session key.
+ */
+ private final String sessionKey;
+ /**
+ * Field name of the processing time attribute, null if no processing time
+ * field is defined.
+ */
+ private final Optional<String> proctimeAttribute;
+ /**
+ * status of error
+ */
+ private final boolean ignoreErrors;
+ /**
+ * The InLong inner format.
+ */
+ private final boolean innerFormat;
+ /**
+ * Data type that describes the final output of the source.
+ */
+ protected DataType producedDataType;
+ /**
+ * Metadata that is appended at the end of a physical source row.
+ */
+ protected List<String> metadataKeys;
+ /**
+ * Watermark strategy that is used to generate per-partition watermark.
+ */
+ @Nullable
+ private WatermarkStrategy<RowData> watermarkStrategy;
+
+ public TubeMQTableSource(DataType physicalDataType,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ String masterAddress, String topic,
+ TreeSet<String> tidSet, String consumerGroup, String sessionKey,
+ Configuration configuration, @Nullable WatermarkStrategy<RowData>
watermarkStrategy,
+ Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat) {
+
+ Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
+ Preconditions.checkNotNull(valueDecodingFormat, "The deserialization
schema must not be null.");
+ Preconditions.checkNotNull(masterAddress, "The master address must not
be null.");
+ Preconditions.checkNotNull(topic, "The topic must not be null.");
+ Preconditions.checkNotNull(tidSet, "The tid set must not be null.");
+ Preconditions.checkNotNull(consumerGroup, "The consumer group must not
be null.");
+ Preconditions.checkNotNull(configuration, "The configuration must not
be null.");
+
+ this.physicalDataType = physicalDataType;
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = Collections.emptyList();
+ this.valueDecodingFormat = valueDecodingFormat;
+ this.masterAddress = masterAddress;
+ this.topic = topic;
+ this.tidSet = tidSet;
+ this.consumerGroup = consumerGroup;
+ this.sessionKey = sessionKey;
+ this.configuration = configuration;
+ this.watermarkStrategy = watermarkStrategy;
+ this.proctimeAttribute = proctimeAttribute;
+ this.ignoreErrors = ignoreErrors;
+ this.innerFormat = innerFormat;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ final int physicalFieldCount =
LogicalTypeChecks.getFieldCount(physicalType);
+ final IntStream physicalFields = IntStream.range(0,
physicalFieldCount);
+ final DeserializationSchema<RowData> deserialization =
createDeserialization(context,
+ valueDecodingFormat, physicalFields.toArray(), null);
+
+ final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(physicalDataType);
+
+ final FlinkTubeMQConsumer<RowData> tubeMQConsumer =
createTubeMQConsumer(deserialization, producedTypeInfo,
+ ignoreErrors);
+
+ return SourceFunctionProvider.of(tubeMQConsumer, false);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new TubeMQTableSource(
+ physicalDataType, valueDecodingFormat, masterAddress,
+ topic, tidSet, consumerGroup, sessionKey, configuration,
+ watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "TubeMQ table source";
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX
+ key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TubeMQTableSource that = (TubeMQTableSource) o;
+ return Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
+ && Objects.equals(masterAddress, that.masterAddress)
+ && Objects.equals(topic, that.topic)
+ && Objects.equals(String.valueOf(tidSet),
String.valueOf(that.tidSet))
+ && Objects.equals(consumerGroup, that.consumerGroup)
+ && Objects.equals(proctimeAttribute, that.proctimeAttribute)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalDataType,
+ valueDecodingFormat,
+ masterAddress,
+ topic,
+ tidSet,
+ consumerGroup,
+ configuration,
+ watermarkStrategy,
+ proctimeAttribute);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ @Nullable
+ private DeserializationSchema<RowData> createDeserialization(
+ Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
DataTypeUtils.projectRow(this.physicalDataType, projection);
+ if (prefix != null) {
+ physicalFormatDataType =
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ protected FlinkTubeMQConsumer<RowData> createTubeMQConsumer(
+ DeserializationSchema<RowData> deserialization,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean ignoreErrors) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+ final DeserializationSchema<RowData> tubeMQDeserializer = new
DynamicTubeMQDeserializationSchema(
+ deserialization, metadataConverters, producedTypeInfo,
ignoreErrors);
+
+ final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, tidSet,
+ consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);
+ return tubeMQConsumer;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(Message msg) {
+ return StringData.fromString(msg.getTopic());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..ed092ea8e5
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.tubemq.table.TubeMQDynamicTableFactory