This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ced02ce25a2 [FLINK-35977][docs] Add missing imports in examples in docs
ced02ce25a2 is described below
commit ced02ce25a208a1154aa114439cda508637a47d7
Author: Peng Lu <[email protected]>
AuthorDate: Mon Nov 25 04:12:57 2024 +0800
[FLINK-35977][docs] Add missing imports in examples in docs
---
.../dev/datastream/operators/process_function.md | 2 +-
docs/content.zh/docs/dev/table/data_stream_api.md | 2 +-
docs/content.zh/docs/dev/table/sourcesSinks.md | 26 +++++++++++++++-------
docs/content.zh/docs/try-flink/datastream.md | 2 +-
.../dev/datastream/operators/process_function.md | 2 +-
docs/content/docs/dev/table/data_stream_api.md | 2 +-
docs/content/docs/dev/table/sourcesSinks.md | 26 +++++++++++++++-------
docs/content/docs/try-flink/datastream.md | 2 +-
8 files changed, 42 insertions(+), 22 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/operators/process_function.md
b/docs/content.zh/docs/dev/datastream/operators/process_function.md
index 82708b653bd..dfa49a624cd 100644
--- a/docs/content.zh/docs/dev/datastream/operators/process_function.md
+++ b/docs/content.zh/docs/dev/datastream/operators/process_function.md
@@ -87,11 +87,11 @@ stream.keyBy(...).process(new MyProcessFunction());
{{< tab "Java" >}}
```java
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
diff --git a/docs/content.zh/docs/dev/table/data_stream_api.md
b/docs/content.zh/docs/dev/table/data_stream_api.md
index d96c075f5e5..7cbb8551e4d 100644
--- a/docs/content.zh/docs/dev/table/data_stream_api.md
+++ b/docs/content.zh/docs/dev/table/data_stream_api.md
@@ -959,11 +959,11 @@ a custom operator that deduplicates the user name using a
`KeyedProcessFunction`
{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
{{< tab "Java" >}}
```java
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md
b/docs/content.zh/docs/dev/table/sourcesSinks.md
index 97317a13b34..8530e969dc8 100644
--- a/docs/content.zh/docs/dev/table/sourcesSinks.md
+++ b/docs/content.zh/docs/dev/table/sourcesSinks.md
@@ -391,6 +391,9 @@ import
org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
+import java.util.HashSet;
+import java.util.Set;
+
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
// 定义所有配置项
@@ -473,6 +476,10 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory
{
// 定义所有配置项
@@ -608,6 +615,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
+import java.util.List;
+
public class ChangelogCsvFormat implements
DecodingFormat<DeserializationSchema<RowData>> {
private final String columnDelimiter;
@@ -622,8 +631,7 @@ public class ChangelogCsvFormat implements
DecodingFormat<DeserializationSchema<
DynamicTableSource.Context context,
DataType producedDataType) {
// 为 DeserializationSchema 创建类型信息
- final TypeInformation<RowData> producedTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(
- producedDataType);
+ final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
// DeserializationSchema 中的大多数代码无法处理内部数据结构
// 在最后为转换创建一个转换器
@@ -668,6 +676,9 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import java.util.List;
+import java.util.regex.Pattern;
+
public class ChangelogCsvDeserializer implements
DeserializationSchema<RowData> {
private final List<LogicalType> parsingTypes;
@@ -737,10 +748,14 @@ public class ChangelogCsvDeserializer implements
DeserializationSchema<RowData>
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
public class SocketSourceFunction extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData> {
private final String hostname;
@@ -763,11 +778,6 @@ public class SocketSourceFunction extends
RichSourceFunction<RowData> implements
return deserializer.getProducedType();
}
- @Override
- public void open(OpenContext openContext) throws Exception {
- deserializer.open(() -> getRuntimeContext().getMetricGroup());
- }
-
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
while (isRunning) {
diff --git a/docs/content.zh/docs/try-flink/datastream.md
b/docs/content.zh/docs/try-flink/datastream.md
index 6bc0b289711..3131f7afce8 100644
--- a/docs/content.zh/docs/try-flink/datastream.md
+++ b/docs/content.zh/docs/try-flink/datastream.md
@@ -546,10 +546,10 @@ private void cleanUp(Context ctx) throws Exception {
```java
package spendreport;
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
diff --git a/docs/content/docs/dev/datastream/operators/process_function.md
b/docs/content/docs/dev/datastream/operators/process_function.md
index 0ec8b739c87..990f6a5dfd7 100644
--- a/docs/content/docs/dev/datastream/operators/process_function.md
+++ b/docs/content/docs/dev/datastream/operators/process_function.md
@@ -94,11 +94,11 @@ session windows. We use `KeyedProcessFunction` here to
illustrate the basic patt
{{< tab "Java" >}}
```java
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
diff --git a/docs/content/docs/dev/table/data_stream_api.md
b/docs/content/docs/dev/table/data_stream_api.md
index 9f77e715fb1..6932ffcdd47 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -957,11 +957,11 @@ a custom operator that deduplicates the user name using a
`KeyedProcessFunction`
{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
{{< tab "Java" >}}
```java
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
diff --git a/docs/content/docs/dev/table/sourcesSinks.md
b/docs/content/docs/dev/table/sourcesSinks.md
index 5647f57a405..4bf681e8f51 100644
--- a/docs/content/docs/dev/table/sourcesSinks.md
+++ b/docs/content/docs/dev/table/sourcesSinks.md
@@ -472,6 +472,9 @@ import
org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
+import java.util.HashSet;
+import java.util.Set;
+
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
// define all options statically
@@ -557,6 +560,10 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory
{
// define all options statically
@@ -696,6 +703,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
+import java.util.List;
+
public class ChangelogCsvFormat implements
DecodingFormat<DeserializationSchema<RowData>> {
private final String columnDelimiter;
@@ -710,8 +719,7 @@ public class ChangelogCsvFormat implements
DecodingFormat<DeserializationSchema<
DynamicTableSource.Context context,
DataType producedDataType) {
// create type information for the DeserializationSchema
- final TypeInformation<RowData> producedTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(
- producedDataType);
+ final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
// most of the code in DeserializationSchema will not work on internal
data structures
// create a converter for conversion at the end
@@ -755,6 +763,9 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import java.util.List;
+import java.util.regex.Pattern;
+
public class ChangelogCsvDeserializer implements
DeserializationSchema<RowData> {
private final List<LogicalType> parsingTypes;
@@ -826,10 +837,14 @@ source function can only work with a parallelism of 1.
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
public class SocketSourceFunction extends RichSourceFunction<RowData>
implements ResultTypeQueryable<RowData> {
private final String hostname;
@@ -852,11 +867,6 @@ public class SocketSourceFunction extends
RichSourceFunction<RowData> implements
return deserializer.getProducedType();
}
- @Override
- public void open(OpenContext openContext) throws Exception {
- deserializer.open(() -> getRuntimeContext().getMetricGroup());
- }
-
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
while (isRunning) {
diff --git a/docs/content/docs/try-flink/datastream.md
b/docs/content/docs/try-flink/datastream.md
index 3c7be9905b1..3d0b607d5ea 100644
--- a/docs/content/docs/try-flink/datastream.md
+++ b/docs/content/docs/try-flink/datastream.md
@@ -489,10 +489,10 @@ And that's it, a fully functional, stateful, distributed
streaming application!
{{< tabs "finalapplication" >}}
{{< tab "Java" >}}
```java
+import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;