This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 16cabe6a3 [Improve][Connector-V2] Pulsar support user-defined schema
(#2436)
16cabe6a3 is described below
commit 16cabe6a35d60a12535a43736d9c52fa878e9cbe
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Aug 30 12:14:34 2022 +0800
[Improve][Connector-V2] Pulsar support user-defined schema (#2436)
* [Improve][Connector-V2] Pulsar support user-defined schema
* import the constant
---
seatunnel-connectors-v2/connector-pulsar/pom.xml | 13 +++++++++++++
.../connectors/seatunnel/pulsar/source/PulsarSource.java | 7 ++++++-
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors-v2/connector-pulsar/pom.xml
b/seatunnel-connectors-v2/connector-pulsar/pom.xml
index db6aa4fc0..3fd4cfb90 100644
--- a/seatunnel-connectors-v2/connector-pulsar/pom.xml
+++ b/seatunnel-connectors-v2/connector-pulsar/pom.xml
@@ -39,6 +39,19 @@
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Pulsar testing environment -->
<dependency>
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 88c36d816..efc84ee6b 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -46,9 +46,11 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
@@ -64,6 +66,7 @@ import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discov
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -222,7 +225,9 @@ public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit,
private void setDeserialization(Config config) {
String format = config.getString("format");
- // TODO: json format
+ // TODO: format SPI
+ SeaTunnelRowType rowType =
SeatunnelSchema.buildWithConfig(config.getConfig(SeatunnelSchema.SCHEMA)).getSeaTunnelRowType();
+ deserialization = (DeserializationSchema<T>) new
JsonDeserializationSchema(false, false, rowType);
}
@Override