This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository
https://gitbox.apache.org/repos/asf/incubator-streampark-website.git
The following commit(s) were added to refs/heads/dev by this push:
new 083c34a9 Update 1-kafka.md (#376)
083c34a9 is described below
commit 083c34a935b52aec52575fbdebe78c91d493df17
Author: Liuqitao <[email protected]>
AuthorDate: Sun May 26 21:34:29 2024 +0800
Update 1-kafka.md (#376)
Content duplication
---
.../current/connector/1-kafka.md | 90 ----------------------
1 file changed, 90 deletions(-)
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md
index b881c0c8..a9812298 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md
@@ -897,96 +897,6 @@ class JavaUser implements Serializable {
</TabItem>
</Tabs>
-### 指定SerializationSchema
-
-` Flink Kafka Producer` 需要知道如何将 Java/Scala 对象转化为二进制数据。
KafkaSerializationSchema 允许用户指定这样的schema,
相关操作方式和文档请参考[官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#the-serializationschema)
-
-在`KafkaSink`里默认不指定序列化方式,采用的是`SimpleStringSchema`来进行序列化,这里开发者可以显示的指定一个自定义的序列化器,通过`serializationSchema`参数指定即可,例如,将`user`对象安装自定义的格式写入`kafka`
-
-<Tabs>
-<TabItem value="scala" label="Scala" default>
-
-```scala
-import org.apache.streampark.common.util.JsonUtils
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.flink.api.common.serialization.SerializationSchema
-import org.apache.streampark.flink.core.scala.sink.JdbcSink
-import org.apache.streampark.flink.core.scala.source.KafkaSource
-import org.apache.flink.api.scala._
-
-object KafkaSinkApp extends FlinkStreaming {
-
-override def handle(): Unit = {
-val source = KafkaSource()
-.getDataStream[String]()
-.map(x => JsonUtils.read[User](x.value))
-
- KafkaSink().sink[User](source, serialization = new
SerializationSchema[User]() {
- override def serialize(user: User): Array[Byte] = {
- s"${user.name},${user.age},${user.gender},${user.address}".getBytes
- }
- })
-
-}
-
-}
-
-case class User(name: String, age: Int, gender: Int, address: String)
-```
-
-</TabItem>
-<TabItem value="Java" label="Java">
-
-```java
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
-import org.apache.streampark.flink.core.java.sink.KafkaSink;
-import org.apache.streampark.flink.core.java.source.KafkaSource;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.core.scala.source.KafkaRecord;
-import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.io.Serializable;
-
-public class kafkaSinkJavaApp {
-
- public static void main(String[] args) {
-
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- ObjectMapper mapper = new ObjectMapper();
-
- DataStream<JavaUser> source = new KafkaSource<String>(context)
- .getDataStream()
- .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
- mapper.readValue(value.value(), JavaUser.class));
-
- new KafkaSink<JavaUser>(context)
- .serializer(
- (SerializationSchema<JavaUser>) element ->
- String.format("%s,%d,%d,%s", element.name,
element.age, element.gender, element.address).getBytes()
- ).sink(source);
-
- context.start();
- }
-
-}
-
-class JavaUser implements Serializable {
- String name;
- Integer age;
- Integer gender;
- String address;
-}
-```
-
-</TabItem>
-</Tabs>
-
### 指定partitioner