This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 271541f598 [INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15
connector (#9096)
271541f598 is described below
commit 271541f59824cf773044ff961367b5f0258b2b19
Author: Sting <[email protected]>
AuthorDate: Tue Oct 24 16:44:40 2023 +0800
[INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15 connector
(#9096)
---
inlong-sort/sort-core/pom.xml | 6 ++++++
.../sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml | 7 ++++++-
.../java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 4 +++-
3 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index c463785605..e8303bc57d 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -281,6 +281,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-pulsar-v1.15</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 9fc0949b46..d2b24382a6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -28,7 +28,7 @@
<artifactId>sort-connector-pulsar-v1.15</artifactId>
<packaging>jar</packaging>
- <name>Apache InLong - Sort-connector-pulsar-v1.15</name>
+ <name>Apache InLong - Sort-connector-pulsar</name>
<properties>
<pulsar.version>2.10.2</pulsar.version>
@@ -129,6 +129,11 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 6784fc6790..05277ff5a2 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.pulsar;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import
org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchemaFactory;
import org.apache.inlong.sort.pulsar.table.PulsarTableSource;
@@ -110,7 +111,8 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
- PulsarSinkOptions.SINK_CONFIG_PREFIX);
+ PulsarSinkOptions.SINK_CONFIG_PREFIX,
+ ExtractNode.INLONG_MSG);
validatePrimaryKeyConstraints(
context.getObjectIdentifier(), context.getPrimaryKeyIndexes(),
helper);