This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 16cabe6a3 [Improve][Connector-V2] Pulsar support user-defined schema 
(#2436)
16cabe6a3 is described below

commit 16cabe6a35d60a12535a43736d9c52fa878e9cbe
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Aug 30 12:14:34 2022 +0800

    [Improve][Connector-V2] Pulsar support user-defined schema (#2436)
    
    * [Improve][Connector-V2] Pulsar support user-defined schema
    
    * import the constant
---
 seatunnel-connectors-v2/connector-pulsar/pom.xml            | 13 +++++++++++++
 .../connectors/seatunnel/pulsar/source/PulsarSource.java    |  7 ++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors-v2/connector-pulsar/pom.xml 
b/seatunnel-connectors-v2/connector-pulsar/pom.xml
index db6aa4fc0..3fd4cfb90 100644
--- a/seatunnel-connectors-v2/connector-pulsar/pom.xml
+++ b/seatunnel-connectors-v2/connector-pulsar/pom.xml
@@ -39,6 +39,19 @@
             <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Pulsar testing environment -->
 
         <dependency>
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 88c36d816..efc84ee6b 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -46,9 +46,11 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
@@ -64,6 +66,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discov
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -222,7 +225,9 @@ public class PulsarSource<T> implements SeaTunnelSource<T, 
PulsarPartitionSplit,
 
     private void setDeserialization(Config config) {
         String format = config.getString("format");
-        // TODO: json format
+        // TODO: format SPI
+        SeaTunnelRowType rowType = 
SeatunnelSchema.buildWithConfig(config.getConfig(SeatunnelSchema.SCHEMA)).getSeaTunnelRowType();
+        deserialization = (DeserializationSchema<T>) new 
JsonDeserializationSchema(false, false, rowType);
     }
 
     @Override

Reply via email to