This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 206f4d02 Fix Kafka Sink on flink cannot serialize element (#1955)
206f4d02 is described below

commit 206f4d02dc058bbb05fe8a739942a1fd185c5948
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 26 12:46:11 2022 +0800

    Fix Kafka Sink on flink cannot serialize element (#1955)
---
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 39 +++++++++++++++++++---
 .../kafka/serialize/SeaTunnelRowSerializer.java    | 17 ++++++++++
 .../kafka/sink/KafkaNoTransactionSender.java       |  9 ++---
 .../seatunnel/kafka/sink/KafkaProduceSender.java   |  5 +--
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  3 ++
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 37 ++++++++++++--------
 .../kafka/sink/KafkaTransactionSender.java         | 10 ++----
 .../execution/AbstractPluginExecuteProcessor.java  |  2 ++
 .../flink/execution/FlinkTaskExecution.java        | 29 ++++++++++++----
 .../flink/execution/SinkExecuteProcessor.java      | 38 ++++++++++++++-------
 .../flink/execution/SourceExecuteProcessor.java    | 11 +++---
 .../flink/execution/TransformExecuteProcessor.java |  8 ++---
 12 files changed, 147 insertions(+), 61 deletions(-)

diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index bb6c14f8..84dfac44 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -1,19 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.Common;
 
+import com.alibaba.fastjson.JSON;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<String, SeaTunnelRow> {
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer<String, String> {
 
     private final String topic;
+    private final SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
 
-    public DefaultSeaTunnelRowSerializer(String topic) {
+    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowTypeInfo 
seaTunnelRowTypeInfo) {
         this.topic = topic;
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
     }
 
     @Override
-    public ProducerRecord<String, SeaTunnelRow> serializeRow(SeaTunnelRow row) 
{
-        return new ProducerRecord<>(topic, null, row);
+    public ProducerRecord<String, String> serializeRow(SeaTunnelRow row) {
+        Map<Object, Object> map = new HashMap<>(Common.COLLECTION_SIZE);
+        String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+        Object[] fields = row.getFields();
+        for (int i = 0; i < fieldNames.length; i++) {
+            map.put(fieldNames[i], fields[i]);
+        }
+        return new ProducerRecord<>(topic, null, JSON.toJSONString(map));
     }
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index ae92e2a9..9f12591e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index c1339e48..3691acd9 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
 
@@ -39,16 +37,13 @@ import java.util.Properties;
 public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K, 
V> {
 
     private final KafkaProducer<K, V> kafkaProducer;
-    private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
 
-    public KafkaNoTransactionSender(Properties properties, 
SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer) {
-        this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+    public KafkaNoTransactionSender(Properties properties) {
         this.kafkaProducer = new KafkaProducer<>(properties);
     }
 
     @Override
-    public void send(SeaTunnelRow seaTunnelRow) {
-        ProducerRecord<K, V> producerRecord = 
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+    public void send(ProducerRecord<K, V> producerRecord) {
         kafkaProducer.send(producerRecord);
     }
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
index 1444755a..9836cb9e 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -17,10 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -30,7 +31,7 @@ public interface KafkaProduceSender<K, V> extends 
AutoCloseable {
      *
      * @param seaTunnelRow data to send
      */
-    void send(SeaTunnelRow seaTunnelRow);
+    void send(ProducerRecord<K, V> producerRecord);
 
     void beginTransaction();
 
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index c6a96e3f..5e63d79c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -31,6 +31,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.auto.service.AutoService;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -39,6 +41,7 @@ import java.util.Optional;
  * Kafka Sink implementation by using SeaTunnel sink API.
  * This class contains the method to create {@link KafkaSinkWriter} and {@link 
KafkaSinkCommitter}.
  */
+@AutoService(SeaTunnelSink.class)
 public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState, 
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
 
     private Config pluginConfig;
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index c202c699..6d3a122c 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -29,6 +29,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -42,7 +46,16 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
     private final Config pluginConfig;
 
-    private KafkaProduceSender<?, ?> kafkaProducerSender;
+    private KafkaProduceSender<String, String> kafkaProducerSender;
+
+    // check config
+    @Override
+    public void write(SeaTunnelRow element) {
+        ProducerRecord<String, String> producerRecord = 
seaTunnelRowSerializer.serializeRow(element);
+        kafkaProducerSender.send(producerRecord);
+    }
+
+    private SeaTunnelRowSerializer<String, String> seaTunnelRowSerializer;
 
     public KafkaSinkWriter(
         SinkWriter.Context context,
@@ -52,25 +65,17 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         this.context = context;
         this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
         this.pluginConfig = pluginConfig;
+        this.seaTunnelRowSerializer = getSerializer(pluginConfig, 
seaTunnelRowTypeInfo);
         if 
(KafkaSemantics.AT_LEAST_ONCE.equals(getKafkaSemantics(pluginConfig))) {
             // the recover state
-            this.kafkaProducerSender = new KafkaTransactionSender<>(
-                getKafkaProperties(pluginConfig),
-                getSerializer(pluginConfig));
+            this.kafkaProducerSender = new 
KafkaTransactionSender<>(getKafkaProperties(pluginConfig));
             this.kafkaProducerSender.abortTransaction(kafkaStates);
             this.kafkaProducerSender.beginTransaction();
         } else {
-            this.kafkaProducerSender = new KafkaNoTransactionSender<>(
-                getKafkaProperties(pluginConfig),
-                getSerializer(pluginConfig));
+            this.kafkaProducerSender = new 
KafkaNoTransactionSender<>(getKafkaProperties(pluginConfig));
         }
     }
 
-    @Override
-    public void write(SeaTunnelRow element) {
-        kafkaProducerSender.send(element);
-    }
-
     @Override
     public List<KafkaState> snapshotState() {
         return kafkaProducerSender.snapshotState();
@@ -102,11 +107,15 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         kafkaConfig.entrySet().forEach(entry -> {
             kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
         });
+        kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         return kafkaProperties;
     }
 
-    private SeaTunnelRowSerializer<?, ?> getSerializer(Config pluginConfig) {
-        return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topic"));
+    // todo: parse the target field from config
+    private SeaTunnelRowSerializer<String, String> getSerializer(Config 
pluginConfig, SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        return new 
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), 
seaTunnelRowTypeInfo);
     }
 
     private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
diff --git 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 2956e361..766313c4 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ 
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
 
@@ -44,21 +42,17 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTransactionSender.class);
 
     private final KafkaProducer<K, V> kafkaProducer;
-    private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
     private final String transactionId;
     private final Properties kafkaProperties;
 
-    public KafkaTransactionSender(Properties kafkaProperties,
-                                  SeaTunnelRowSerializer<K, V> 
seaTunnelRowSerializer) {
+    public KafkaTransactionSender(Properties kafkaProperties) {
         this.kafkaProperties = kafkaProperties;
-        this.seaTunnelRowSerializer = seaTunnelRowSerializer;
         this.transactionId = getTransactionId();
         this.kafkaProducer = getTransactionProducer(kafkaProperties, 
transactionId);
     }
 
     @Override
-    public void send(SeaTunnelRow seaTunnelRow) {
-        ProducerRecord<K, V> producerRecord = 
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+    public void send(ProducerRecord<K, V> producerRecord) {
         kafkaProducer.send(producerRecord);
     }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 9df335a3..6c3ebceb 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -38,6 +38,8 @@ public abstract class AbstractPluginExecuteProcessor<T> 
implements PluginExecute
     protected final FlinkEnvironment flinkEnvironment;
     protected final List<? extends Config> pluginConfigs;
     protected final List<T> plugins;
+    protected static final String ENGINE_TYPE = "seatunnel";
+    protected static final String PLUGIN_NAME = "plugin_name";
 
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                              List<? extends Config> 
pluginConfigs) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
index 7805201f..a2fcd10f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
@@ -17,9 +17,8 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -47,10 +46,7 @@ public class FlinkTaskExecution {
 
     public FlinkTaskExecution(Config config) {
         this.config = config;
-        // todo: create the environment
-        this.flinkEnvironment = (FlinkEnvironment) new 
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
-        this.flinkEnvironment.setJobMode(JobMode.STREAMING);
-        this.flinkEnvironment.prepare();
+        this.flinkEnvironment = createFlinkEnvironment();
         this.sourcePluginExecuteProcessor = new 
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
         this.transformPluginExecuteProcessor = new 
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
         this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
@@ -65,4 +61,25 @@ public class FlinkTaskExecution {
         LOGGER.info("Flink Execution Plan:{}", 
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
         
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
     }
+
+    private FlinkEnvironment createFlinkEnvironment() {
+        // todo: we need to split the new api into a separate module.
+        // we override the environment here, since we need to create 
StreamExecutionEnvironment.
+        FlinkEnvironment flinkEnvironment = new FlinkEnvironment() {
+            @Override
+            public boolean isStreaming() {
+                return true;
+            }
+        };
+        Config envConfig = config.getConfig("env");
+        JobMode jobMode = JobMode.STREAMING;
+        if (envConfig.hasPath("job.mode")) {
+            jobMode = envConfig.getEnum(JobMode.class, "job.mode");
+        }
+        SeaTunnelContext.getContext().setJobMode(jobMode);
+        flinkEnvironment.setConfig(envConfig)
+            .setJobMode(jobMode)
+            .prepare();
+        return flinkEnvironment;
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index cdabafb9..24b8fffc 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -18,28 +18,34 @@
 package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import scala.Serializable;
 
-public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable, 
Serializable>> {
+public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>> {
+
+    private static final String PLUGIN_TYPE = "sink";
 
     protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                    List<? extends Config> pluginConfigs) {
@@ -47,19 +53,16 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<Sink<Ro
     }
 
     @Override
-    protected List<Sink<Row, Serializable, Serializable, Serializable>> 
initializePlugins(List<? extends Config> pluginConfigs) {
+    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new 
SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, 
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
-        List<Sink<Row, Serializable, Serializable, Serializable>> sinks = 
pluginConfigs.stream().map(sinkConfig -> {
-            PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                "seatunnel",
-                "sink",
-                sinkConfig.getString("plugin_name"));
+        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+            PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
sinkConfig.getString(PLUGIN_NAME));
             
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> pluginInstance =
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink =
                 sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
-            return flinkSinkConverter.convert(pluginInstance, 
Collections.emptyMap());
+            seaTunnelSink.prepare(sinkConfig);
+            return seaTunnelSink;
         }).collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
         return sinks;
@@ -68,12 +71,23 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<Sink<Ro
     @Override
     public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws Exception {
         DataStream<Row> input = upstreamDataStreams.get(0);
+        FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, 
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink = plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            stream.sinkTo(plugins.get(i));
+            seaTunnelSink.setTypeInfo(getSeaTunnelRowTypeInfo(stream));
+            stream.sinkTo(flinkSinkConverter.convert(seaTunnelSink, 
Collections.emptyMap()));
         }
         // the sink is the last stream
         return null;
     }
+
+    private SeaTunnelRowTypeInfo getSeaTunnelRowTypeInfo(DataStream<Row> 
stream) {
+        RowTypeInfo typeInformation = (RowTypeInfo) stream.getType();
+        String[] fieldNames = typeInformation.getFieldNames();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = 
Arrays.stream(typeInformation.getFieldTypes())
+            
.map(TypeConverterUtils::convertType).toArray(SeaTunnelDataType[]::new);
+        return new SeaTunnelRowTypeInfo(fieldNames, seaTunnelDataTypes);
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 911698e6..689b03f0 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -36,6 +37,8 @@ import java.util.List;
 
 public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunnelParallelSource> {
 
+    private static final String PLUGIN_TYPE = "source";
+
     public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                   List<? extends Config> sourceConfigs) {
         super(flinkEnvironment, sourceConfigs);
@@ -61,11 +64,11 @@ public class SourceExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTu
         List<URL> jars = new ArrayList<>();
         for (Config sourceConfig : pluginConfigs) {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                "seatunnel",
-                "source",
-                sourceConfig.getString("plugin_name"));
+                ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
             
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            sources.add(new 
SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+            SeaTunnelSource seaTunnelSource = 
sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+            seaTunnelSource.prepare(sourceConfig);
+            sources.add(new SeaTunnelParallelSource(seaTunnelSource));
         }
         flinkEnvironment.registerPlugin(jars);
         return sources;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 9bd5ed8f..e06c761e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -34,6 +34,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+
+    private static final String PLUGIN_TYPE = "transform";
+
     protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                         List<? extends Config> pluginConfigs) {
         super(flinkEnvironment, pluginConfigs);
@@ -45,10 +48,7 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Fl
         List<URL> pluginJars = new ArrayList<>();
         List<FlinkStreamTransform> transforms = pluginConfigs.stream()
             .map(transformConfig -> {
-                PluginIdentifier pluginIdentifier = PluginIdentifier.of(
-                    "seatunnel",
-                    "transform",
-                    transformConfig.getString("plugin_name"));
+                PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
transformConfig.getString(PLUGIN_NAME));
                 
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
                 FlinkStreamTransform pluginInstance = (FlinkStreamTransform) 
transformPluginDiscovery.getPluginInstance(pluginIdentifier);
                 pluginInstance.setConfig(transformConfig);

Reply via email to