This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 206f4d02 Fix Kafka Sink on flink cannot serialize element (#1955)
206f4d02 is described below
commit 206f4d02dc058bbb05fe8a739942a1fd185c5948
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu May 26 12:46:11 2022 +0800
Fix Kafka Sink on flink cannot serialize element (#1955)
---
.../serialize/DefaultSeaTunnelRowSerializer.java | 39 +++++++++++++++++++---
.../kafka/serialize/SeaTunnelRowSerializer.java | 17 ++++++++++
.../kafka/sink/KafkaNoTransactionSender.java | 9 ++---
.../seatunnel/kafka/sink/KafkaProduceSender.java | 5 +--
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 3 ++
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 37 ++++++++++++--------
.../kafka/sink/KafkaTransactionSender.java | 10 ++----
.../execution/AbstractPluginExecuteProcessor.java | 2 ++
.../flink/execution/FlinkTaskExecution.java | 29 ++++++++++++----
.../flink/execution/SinkExecuteProcessor.java | 38 ++++++++++++++-------
.../flink/execution/SourceExecuteProcessor.java | 11 +++---
.../flink/execution/TransformExecuteProcessor.java | 8 ++---
12 files changed, 147 insertions(+), 61 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index bb6c14f8..84dfac44 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -1,19 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.common.config.Common;
+import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
-public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<String, SeaTunnelRow> {
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<String, String> {
private final String topic;
+ private final SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
- public DefaultSeaTunnelRowSerializer(String topic) {
+ public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowTypeInfo
seaTunnelRowTypeInfo) {
this.topic = topic;
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
}
@Override
- public ProducerRecord<String, SeaTunnelRow> serializeRow(SeaTunnelRow row)
{
- return new ProducerRecord<>(topic, null, row);
+ public ProducerRecord<String, String> serializeRow(SeaTunnelRow row) {
+ Map<Object, Object> map = new HashMap<>(Common.COLLECTION_SIZE);
+ String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
+ Object[] fields = row.getFields();
+ for (int i = 0; i < fieldNames.length; i++) {
+ map.put(fieldNames[i], fields[i]);
+ }
+ return new ProducerRecord<>(topic, null, JSON.toJSONString(map));
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index ae92e2a9..9f12591e 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index c1339e48..3691acd9 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
@@ -39,16 +37,13 @@ import java.util.Properties;
public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K,
V> {
private final KafkaProducer<K, V> kafkaProducer;
- private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
- public KafkaNoTransactionSender(Properties properties,
SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer) {
- this.seaTunnelRowSerializer = seaTunnelRowSerializer;
+ public KafkaNoTransactionSender(Properties properties) {
this.kafkaProducer = new KafkaProducer<>(properties);
}
@Override
- public void send(SeaTunnelRow seaTunnelRow) {
- ProducerRecord<K, V> producerRecord =
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+ public void send(ProducerRecord<K, V> producerRecord) {
kafkaProducer.send(producerRecord);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
index 1444755a..9836cb9e 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -17,10 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
import java.util.List;
import java.util.Optional;
@@ -30,7 +31,7 @@ public interface KafkaProduceSender<K, V> extends
AutoCloseable {
*
* @param seaTunnelRow data to send
*/
- void send(SeaTunnelRow seaTunnelRow);
+ void send(ProducerRecord<K, V> producerRecord);
void beginTransaction();
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index c6a96e3f..5e63d79c 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
+
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -39,6 +41,7 @@ import java.util.Optional;
* Kafka Sink implementation by using SeaTunnel sink API.
* This class contains the method to create {@link KafkaSinkWriter} and {@link
KafkaSinkCommitter}.
*/
+@AutoService(SeaTunnelSink.class)
public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState,
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
private Config pluginConfig;
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index c202c699..6d3a122c 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -29,6 +29,10 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -42,7 +46,16 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
private final Config pluginConfig;
- private KafkaProduceSender<?, ?> kafkaProducerSender;
+ private KafkaProduceSender<String, String> kafkaProducerSender;
+
+ // check config
+ @Override
+ public void write(SeaTunnelRow element) {
+ ProducerRecord<String, String> producerRecord =
seaTunnelRowSerializer.serializeRow(element);
+ kafkaProducerSender.send(producerRecord);
+ }
+
+ private SeaTunnelRowSerializer<String, String> seaTunnelRowSerializer;
public KafkaSinkWriter(
SinkWriter.Context context,
@@ -52,25 +65,17 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
this.context = context;
this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
this.pluginConfig = pluginConfig;
+ this.seaTunnelRowSerializer = getSerializer(pluginConfig,
seaTunnelRowTypeInfo);
if
(KafkaSemantics.AT_LEAST_ONCE.equals(getKafkaSemantics(pluginConfig))) {
// the recover state
- this.kafkaProducerSender = new KafkaTransactionSender<>(
- getKafkaProperties(pluginConfig),
- getSerializer(pluginConfig));
+ this.kafkaProducerSender = new
KafkaTransactionSender<>(getKafkaProperties(pluginConfig));
this.kafkaProducerSender.abortTransaction(kafkaStates);
this.kafkaProducerSender.beginTransaction();
} else {
- this.kafkaProducerSender = new KafkaNoTransactionSender<>(
- getKafkaProperties(pluginConfig),
- getSerializer(pluginConfig));
+ this.kafkaProducerSender = new
KafkaNoTransactionSender<>(getKafkaProperties(pluginConfig));
}
}
- @Override
- public void write(SeaTunnelRow element) {
- kafkaProducerSender.send(element);
- }
-
@Override
public List<KafkaState> snapshotState() {
return kafkaProducerSender.snapshotState();
@@ -102,11 +107,15 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
});
+ kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return kafkaProperties;
}
- private SeaTunnelRowSerializer<?, ?> getSerializer(Config pluginConfig) {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topic"));
+ // todo: parse the target field from config
+ private SeaTunnelRowSerializer<String, String> getSerializer(Config
pluginConfig, SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"),
seaTunnelRowTypeInfo);
}
private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 2956e361..766313c4 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
@@ -44,21 +42,17 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaTransactionSender.class);
private final KafkaProducer<K, V> kafkaProducer;
- private final SeaTunnelRowSerializer<K, V> seaTunnelRowSerializer;
private final String transactionId;
private final Properties kafkaProperties;
- public KafkaTransactionSender(Properties kafkaProperties,
- SeaTunnelRowSerializer<K, V>
seaTunnelRowSerializer) {
+ public KafkaTransactionSender(Properties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
- this.seaTunnelRowSerializer = seaTunnelRowSerializer;
this.transactionId = getTransactionId();
this.kafkaProducer = getTransactionProducer(kafkaProperties,
transactionId);
}
@Override
- public void send(SeaTunnelRow seaTunnelRow) {
- ProducerRecord<K, V> producerRecord =
seaTunnelRowSerializer.serializeRow(seaTunnelRow);
+ public void send(ProducerRecord<K, V> producerRecord) {
kafkaProducer.send(producerRecord);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 9df335a3..6c3ebceb 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -38,6 +38,8 @@ public abstract class AbstractPluginExecuteProcessor<T>
implements PluginExecute
protected final FlinkEnvironment flinkEnvironment;
protected final List<? extends Config> pluginConfigs;
protected final List<T> plugins;
+ protected static final String ENGINE_TYPE = "seatunnel";
+ protected static final String PLUGIN_NAME = "plugin_name";
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config>
pluginConfigs) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
index 7805201f..a2fcd10f 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
@@ -17,9 +17,8 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -47,10 +46,7 @@ public class FlinkTaskExecution {
public FlinkTaskExecution(Config config) {
this.config = config;
- // todo: create the environment
- this.flinkEnvironment = (FlinkEnvironment) new
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
- this.flinkEnvironment.setJobMode(JobMode.STREAMING);
- this.flinkEnvironment.prepare();
+ this.flinkEnvironment = createFlinkEnvironment();
this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
@@ -65,4 +61,25 @@ public class FlinkTaskExecution {
LOGGER.info("Flink Execution Plan:{}",
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
}
+
+ private FlinkEnvironment createFlinkEnvironment() {
+ // todo: we need to split the new api into a separate module.
+ // we override the environment here, since we need to create
StreamExecutionEnvironment.
+ FlinkEnvironment flinkEnvironment = new FlinkEnvironment() {
+ @Override
+ public boolean isStreaming() {
+ return true;
+ }
+ };
+ Config envConfig = config.getConfig("env");
+ JobMode jobMode = JobMode.STREAMING;
+ if (envConfig.hasPath("job.mode")) {
+ jobMode = envConfig.getEnum(JobMode.class, "job.mode");
+ }
+ SeaTunnelContext.getContext().setJobMode(jobMode);
+ flinkEnvironment.setConfig(envConfig)
+ .setJobMode(jobMode)
+ .prepare();
+ return flinkEnvironment;
+ }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index cdabafb9..24b8fffc 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -18,28 +18,34 @@
package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import scala.Serializable;
-public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable,
Serializable>> {
+public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>> {
+
+ private static final String PLUGIN_TYPE = "sink";
protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
@@ -47,19 +53,16 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<Sink<Ro
}
@Override
- protected List<Sink<Row, Serializable, Serializable, Serializable>>
initializePlugins(List<? extends Config> pluginConfigs) {
+ protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable,
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
- List<Sink<Row, Serializable, Serializable, Serializable>> sinks =
pluginConfigs.stream().map(sinkConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "sink",
- sinkConfig.getString("plugin_name"));
+ List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> pluginInstance =
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink =
sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- return flinkSinkConverter.convert(pluginInstance,
Collections.emptyMap());
+ seaTunnelSink.prepare(sinkConfig);
+ return seaTunnelSink;
}).collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
return sinks;
@@ -68,12 +71,23 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<Sink<Ro
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws Exception {
DataStream<Row> input = upstreamDataStreams.get(0);
+ FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable,
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- stream.sinkTo(plugins.get(i));
+ seaTunnelSink.setTypeInfo(getSeaTunnelRowTypeInfo(stream));
+ stream.sinkTo(flinkSinkConverter.convert(seaTunnelSink,
Collections.emptyMap()));
}
// the sink is the last stream
return null;
}
+
+ private SeaTunnelRowTypeInfo getSeaTunnelRowTypeInfo(DataStream<Row>
stream) {
+ RowTypeInfo typeInformation = (RowTypeInfo) stream.getType();
+ String[] fieldNames = typeInformation.getFieldNames();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes =
Arrays.stream(typeInformation.getFieldTypes())
+
.map(TypeConverterUtils::convertType).toArray(SeaTunnelDataType[]::new);
+ return new SeaTunnelRowTypeInfo(fieldNames, seaTunnelDataTypes);
+ }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 911698e6..689b03f0 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -36,6 +37,8 @@ import java.util.List;
public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunnelParallelSource> {
+ private static final String PLUGIN_TYPE = "source";
+
public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> sourceConfigs) {
super(flinkEnvironment, sourceConfigs);
@@ -61,11 +64,11 @@ public class SourceExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTu
List<URL> jars = new ArrayList<>();
for (Config sourceConfig : pluginConfigs) {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "source",
- sourceConfig.getString("plugin_name"));
+ ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- sources.add(new
SeaTunnelParallelSource(sourcePluginDiscovery.getPluginInstance(pluginIdentifier)));
+ SeaTunnelSource seaTunnelSource =
sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ seaTunnelSource.prepare(sourceConfig);
+ sources.add(new SeaTunnelParallelSource(seaTunnelSource));
}
flinkEnvironment.registerPlugin(jars);
return sources;
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 9bd5ed8f..e06c761e 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -34,6 +34,9 @@ import java.util.List;
import java.util.stream.Collectors;
public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+
+ private static final String PLUGIN_TYPE = "transform";
+
protected TransformExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
super(flinkEnvironment, pluginConfigs);
@@ -45,10 +48,7 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Fl
List<URL> pluginJars = new ArrayList<>();
List<FlinkStreamTransform> transforms = pluginConfigs.stream()
.map(transformConfig -> {
- PluginIdentifier pluginIdentifier = PluginIdentifier.of(
- "seatunnel",
- "transform",
- transformConfig.getString("plugin_name"));
+ PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE,
transformConfig.getString(PLUGIN_NAME));
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
FlinkStreamTransform pluginInstance = (FlinkStreamTransform)
transformPluginDiscovery.getPluginInstance(pluginIdentifier);
pluginInstance.setConfig(transformConfig);