This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0278f0c143 [INLONG-9128][Sort] Fix the failed to init TubeMQ source
with InlongMsg type message (#9129)
0278f0c143 is described below
commit 0278f0c1435e6a3d9266ac7a0dd8ebd3fa5cf74d
Author: vernedeng <[email protected]>
AuthorDate: Fri Oct 27 11:02:30 2023 +0800
[INLONG-9128][Sort] Fix the failed to init TubeMQ source with InlongMsg
type message (#9129)
---
.../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 4 +++-
.../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java | 8 +++-----
.../java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 4 +++-
.../inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java | 4 ++--
4 files changed, 11 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index a4e39946c0..96c27d2b9b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -209,6 +209,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
+
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -220,7 +221,8 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, tidSet);
- messagePullConsumer.completeSubscribe(sessionKey, numTasks, true,
currentOffsets);
+ String jobId = getRuntimeContext().getJobId().toString();
+ messagePullConsumer.completeSubscribe(sessionKey.concat(jobId),
numTasks, true, currentOffsets);
running = true;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 8aad010fba..43fb3e198e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -35,9 +35,7 @@ import
org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
-import java.util.Arrays;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
@@ -60,7 +58,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
public static final String IDENTIFIER = "tubemq-inlong";
- public static final List<String> INNERFORMATTYPE =
Arrays.asList("inlong-msg");
+ public static final String INNERFORMATTYPE = "inlong-msg";
public static boolean innerFormat = false;
@@ -76,7 +74,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration options =
Configuration.fromMap(catalogTable.getOptions());
String formatName =
options.getOptional(FORMAT).orElse(options.get(FORMAT));
- innerFormat = INNERFORMATTYPE.contains(formatName);
+ innerFormat = INNERFORMATTYPE.equals(formatName);
throw new ValidationException(String.format(
"The TubeMQ table '%s' with '%s' format doesn't support
defining PRIMARY KEY constraint"
+ " on the table, because it can't guarantee the
semantic of primary key.",
@@ -109,7 +107,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory {
final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat = getValueDecodingFormat(helper);
// validate all options
- helper.validate();
+ helper.validateExcept(INNERFORMATTYPE);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index 5d7e8b27fa..cafa653f65 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -207,6 +207,7 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
+
ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress,
consumerGroup);
consumerConfig.setConsumePosition(consumeFromMax
? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET
@@ -218,7 +219,8 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, streamIdSet);
- messagePullConsumer.completeSubscribe(sessionKey, numTasks, true,
currentOffsets);
+ String jobId = getRuntimeContext().getJobId().toString();
+ messagePullConsumer.completeSubscribe(sessionKey.concat(jobId),
numTasks, true, currentOffsets);
running = true;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 81b6709418..0472353037 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -117,7 +117,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat = getValueDecodingFormat(helper);
// validate all options
- helper.validate();
+ helper.validateExcept(INNERFORMATTYPE);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT));
@@ -146,7 +146,7 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
= getValueEncodingFormat(helper);
// validate all options
- helper.validate();
+ helper.validateExcept(INNERFORMATTYPE);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueEncodingFormat);