This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7534802242d [FLINK-37015][doc] Remove Scala contents in serialization doc 7534802242d is described below commit 7534802242d775d48a7fd0ced47eb5b6626ccf4f Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Wed Jan 15 09:55:11 2025 +0800 [FLINK-37015][doc] Remove Scala contents in serialization doc * [FLINK-37015][doc] Remove Scala contents in serialization doc --- .../serialization/custom_serialization.md | 16 --- .../serialization/types_serialization.md | 148 +-------------------- 2 files changed, 7 insertions(+), 157 deletions(-) diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md index ea87c8bfc1d..fb2973e8fc2 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md @@ -43,8 +43,6 @@ to specify the state's name, as well as information about the type of the state. It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: -{{< tabs "ee215ff6-2e21-4a40-a1b4-7f114560546f" >}} -{{< tab "Java" >}} ```java public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...}; @@ -55,20 +53,6 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor = checkpointedState = getRuntimeContext().getListState(descriptor); ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} - -val descriptor = new ListStateDescriptor[(String, Integer)]( - "state-name", - new CustomTypeSerializer) -) - -checkpointedState = getRuntimeContext.getListState(descriptor) -``` -{{< /tab >}} -{{< /tabs >}} ## State serializers and schema evolution diff --git a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md index 766024ccd75..532e07ab3c2 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/serialization/types_serialization.md @@ -39,7 +39,7 @@ efficient execution strategies. There are eight different categories of data types: -1. **Java Tuples** and **Scala Case Classes** +1. **Java Tuples** 2. **Java POJOs** 3. **Primitive Types** 4. **Common Collection Types** @@ -48,10 +48,7 @@ There are eight different categories of data types: 7. **Hadoop Writables** 8. **Special Types** -#### Tuples and Case Classes - -{{< tabs "e24bb87b-46e8-4f17-8054-c3400aaa6ddc" >}} -{{< tab "Java" >}} +#### Tuples Tuples are composite types that contain a fixed number of fields with various types. The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple @@ -77,30 +74,9 @@ wordCounts.keyBy(value -> value.f0); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name. - -```scala -case class WordCount(word: String, count: Int) -val input = env.fromElements( - WordCount("hello", 1), - WordCount("world", 2)) // Case Class Data Set - -input.keyBy(_.word) - -val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set - -input2.keyBy(value => (value._1, value._2)) -``` - -{{< /tab >}} -{{< /tabs >}} - #### POJOs -Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements: +Java classes are treated by Flink as a special POJO data type if they fulfill the following requirements: - The class must be public. @@ -122,8 +98,6 @@ If you additionally want to ensure that no field of the POJO will be serialized The following example shows a simple POJO with two public fields. -{{< tabs "0589f3b3-76d8-4913-9595-276da92cbc77" >}} -{{< tab "Java" >}} ```java public class WordWithCount { @@ -145,28 +119,10 @@ DataStream<WordWithCount> wordCounts = env.fromElements( wordCounts.keyBy(value -> value.word); ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -class WordWithCount(var word: String, var count: Int) { - def this() { - this(null, -1) - } -} - -val input = env.fromElements( - new WordWithCount("hello", 1), - new WordWithCount("world", 2)) // Case Class Data Set - -input.keyBy(_.word) - -``` -{{< /tab >}} -{{< /tabs >}} #### Primitive Types -Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`. +Flink supports all Java primitive types such as `Integer`, `String`, and `Double`. #### Common Collection Types @@ -185,7 +141,7 @@ also required to be preserved, you also need to register it with a custom serial #### General Class Types -Flink supports most Java and Scala classes (API and custom). +Flink supports most Java classes (API and custom). Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native resources. Classes that follow the Java Beans conventions work well in general. @@ -218,7 +174,6 @@ defined in the `write()`and `readFields()` methods will be used for serializatio #### Special Types -You can use special types, including Scala's `Either`, `Option`, and `Try`. The Java API has its own custom implementation of `Either`. Similarly to Scala's `Either`, it represents a value of two possible types, *Left* or *Right*. `Either` can be useful for error handling or operators that need to output two different types of records. @@ -332,8 +287,6 @@ Internally, Flink makes the following distinctions between types: * Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported - * Scala *case classes* (including Scala tuples): null fields not supported - * Row: tuples with arbitrary number of fields and support for null fields * POJOs: classes that follow a certain bean-like pattern @@ -363,12 +316,8 @@ serialized with Kryo. #### Creating a TypeInformation or TypeSerializer -To create a TypeInformation object for a type, use the language specific way: - -{{< tabs "013807cc-9f3f-4f91-a770-26b1e5e8c85c" >}} -{{< tab "Java" >}} Because Java generally erases generic type information, you need to pass the type to the TypeInformation -construction: +construction. For non-generic types, you can pass the Class: ```java @@ -381,22 +330,6 @@ TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<T ``` Internally, this creates an anonymous subclass of the TypeHint that captures the generic information to preserve it until runtime. -{{< /tab >}} -{{< tab "Scala" >}} -In Scala, Flink uses *macros* that runs at compile time and captures all generic type information while it is -still available. -```scala -// important: this import is needed to access the 'createTypeInformation' macro function -import org.apache.flink.streaming.api.scala._ - -val stringInfo: TypeInformation[String] = createTypeInformation[String] - -val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)] -``` - -You can still use the same method as in Java as a fallback. -{{< /tab >}} -{{< /tabs >}} There are two ways to create a TypeSerializer. @@ -409,72 +342,6 @@ The second is to use `getRuntimeContext().createSerializer(typeInfo)` within a f [Rich Function]({{< ref "docs/dev/datastream/user_defined_functions#rich-functions" >}}) and calling `getRuntimeContext().createSerializer(typeInfo)`. --------- --------- - -## Type Information in the Scala API - -Scala has very elaborate concepts for runtime type information though *type manifests* and *class tags*. In -general, types and methods have access to the types of their generic parameters - thus, Scala programs do -not suffer from type erasure as Java programs do. - -In addition, Scala allows to run custom code in the Scala Compiler through Scala Macros - that means that some Flink -code gets executed whenever you compile a Scala program written against Flink's Scala API. - -We use the Macros to look at the parameter types and return types of all user functions during compilation - that -is the point in time when certainly all type information is perfectly available. Within the macro, we create -a *TypeInformation* for the function's return types (or parameter types) and make it part of the operation. - - -#### No Implicit Value for Evidence Parameter Error - -In the case where TypeInformation could not be created, programs fail to compile with an error -stating *"could not find implicit value for evidence parameter of type TypeInformation"*. - -A frequent reason if that the code that generates the TypeInformation has not been imported. -Make sure to import the entire flink.api.scala package. -```scala -import org.apache.flink.api.scala._ -``` - -Another common cause are generic methods, which can be fixed as described in the following section. - - -#### Generic Methods - -Consider the following case below: - -```scala -def selectFirst[T](input: DataStream[(T, _)]) : DataStream[T] = { - input.map { v => v._1 } -} - -val data : DataStream[(String, Long) = ... - -val result = selectFirst(data) -``` - -For such generic methods, the data types of the function parameters and return type may not be the same -for every call and are not known at the site where the method is defined. The code above will result -in an error that not enough implicit evidence is available. - -In such cases, the type information has to be generated at the invocation site and passed to the -method. Scala offers *implicit parameters* for that. - -The following code tells Scala to bring a type information for *T* into the function. The type -information will then be generated at the sites where the method is invoked, rather than where the -method is defined. - -```scala -def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = { - input.map { v => v._1 } -} -``` - - --------- --------- - ## Type Information in the Java API @@ -585,8 +452,7 @@ pipeline.serialization-config: Alternatively, you can annotate either the corresponding type or a POJO's field using this type with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation to have the factory associated. -It can be used in both the Java and Scala API. Note that the type information factory associated via -configuration option will have higher precedence. +Note that the type information factory associated via configuration option will have higher precedence. The following example shows how to annotate a custom type `MyTuple` and supply custom type information for it using a factory in Java.