This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e74592ca92f [FLINK-33755] Cleanup usage of deprecated
StreamExecutionEnvironment#generateSequence
e74592ca92f is described below
commit e74592ca92f4eac5bef6e5140ae4e8cc2f0bf1a1
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Dec 5 17:33:29 2023 +0100
[FLINK-33755] Cleanup usage of deprecated
StreamExecutionEnvironment#generateSequence
---
docs/content.zh/docs/dev/datastream/overview.md | 4 +-
docs/content/docs/dev/datastream/overview.md | 4 +-
.../apache/flink/streaming/api/DataStreamTest.java | 52 +++++++++++-----------
.../apache/flink/streaming/api/TypeFillTest.java | 2 +-
.../StreamExecutionEnvironmentTest.java | 7 +--
.../streaming/api/graph/SlotAllocationTest.java | 24 +++++-----
.../graph/StreamGraphCoLocationConstraintTest.java | 4 +-
.../flink/streaming/graph/TranslationTest.java | 2 +-
.../streaming/api/scala/BroadcastStateITCase.scala | 2 +-
.../flink/streaming/api/scala/DataStreamTest.scala | 22 ++++-----
.../streaming/api/scala/SlotAllocationTest.scala | 4 +-
.../streaming/api/scala/StateTestPrograms.scala | 4 +-
.../environment/LocalStreamEnvironmentITCase.java | 2 +-
.../test/streaming/experimental/CollectITCase.java | 2 +-
.../streaming/runtime/BroadcastStateITCase.java | 4 +-
15 files changed, 70 insertions(+), 69 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/overview.md
b/docs/content.zh/docs/dev/datastream/overview.md
index 9544803692e..75c05d5dd22 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -313,7 +313,7 @@ Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecu
- `fromParallelCollection(SplittableIterator, Class)` - 从迭代器并行创建数据流。class
参数指定迭代器返回元素的数据类型。
-- `generateSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
+- `fromSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
自定义:
@@ -357,7 +357,7 @@ Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecu
- `fromParallelCollection(SplittableIterator, Class)` - 从迭代器并行创建数据流。class
参数指定迭代器返回元素的数据类型。
-- `generateSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
+- `fromSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
自定义:
diff --git a/docs/content/docs/dev/datastream/overview.md
b/docs/content/docs/dev/datastream/overview.md
index b72fcf7c871..0034cbd17d9 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -386,7 +386,7 @@ Collection-based:
- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream
from an iterator, in
parallel. The class specifies the data type of the elements returned by the
iterator.
-- `generateSequence(from, to)` - Generates the sequence of numbers in the
given interval, in
+- `fromSequence(from, to)` - Generates the sequence of numbers in the given
interval, in
parallel.
Custom:
@@ -441,7 +441,7 @@ Collection-based:
- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an
iterator, in
parallel. The class specifies the data type of the elements returned by the
iterator.
-- `generateSequence(from, to)` - Generates the sequence of numbers in the
given interval, in
+- `fromSequence(from, to)` - Generates the sequence of numbers in the given
interval, in
parallel.
Custom:
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 936d853c010..6b75b1d2ee7 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -140,7 +140,7 @@ public class DataStreamTest extends TestLogger {
env.setParallelism(4);
DataStream<Long> input1 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -160,7 +160,7 @@ public class DataStreamTest extends TestLogger {
});
DataStream<Long> input6 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -181,7 +181,7 @@ public class DataStreamTest extends TestLogger {
});
DataStream<Long> input2 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -192,7 +192,7 @@ public class DataStreamTest extends TestLogger {
.setParallelism(4);
DataStream<Long> input3 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -214,7 +214,7 @@ public class DataStreamTest extends TestLogger {
.setParallelism(4);
DataStream<Long> input4 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -225,7 +225,7 @@ public class DataStreamTest extends TestLogger {
.setParallelism(2);
DataStream<Long> input5 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.map(
new MapFunction<Long, Long>() {
@Override
@@ -313,7 +313,7 @@ public class DataStreamTest extends TestLogger {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> dataStream1 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.name("testSource1")
.map(
new MapFunction<Long, Long>() {
@@ -325,7 +325,7 @@ public class DataStreamTest extends TestLogger {
.name("testMap");
DataStream<Long> dataStream2 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.name("testSource2")
.map(
new MapFunction<Long, Long>() {
@@ -657,7 +657,7 @@ public class DataStreamTest extends TestLogger {
.getStreamNode(sink.getTransformation().getId())
.getParallelism());
- DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
+ DataStreamSource<Long> parallelSource = env.fromSequence(0, 0);
parallelSource.sinkTo(new DiscardingSink<Long>());
assertEquals(7,
getStreamGraph(env).getStreamNode(parallelSource.getId()).getParallelism());
@@ -711,7 +711,7 @@ public class DataStreamTest extends TestLogger {
"setResources", ResourceSpec.class,
ResourceSpec.class);
sinkMethod.setAccessible(true);
- DataStream<Long> source1 = env.generateSequence(0, 0);
+ DataStream<Long> source1 = env.fromSequence(0, 0);
opMethod.invoke(source1, minResource1, preferredResource1);
DataStream<Long> map1 =
@@ -724,7 +724,7 @@ public class DataStreamTest extends TestLogger {
});
opMethod.invoke(map1, minResource2, preferredResource2);
- DataStream<Long> source2 = env.generateSequence(0, 0);
+ DataStream<Long> source2 = env.fromSequence(0, 0);
opMethod.invoke(source2, minResource3, preferredResource3);
DataStream<Long> map2 =
@@ -823,7 +823,7 @@ public class DataStreamTest extends TestLogger {
public void testTypeInfo() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Long> src1 = env.generateSequence(0, 0);
+ DataStream<Long> src1 = env.fromSequence(0, 0);
assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
DataStream<Tuple2<Integer, String>> map =
@@ -892,7 +892,7 @@ public class DataStreamTest extends TestLogger {
@Deprecated
public void testKeyedStreamProcessTranslation() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> src = env.generateSequence(0, 0);
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
ProcessFunction<Long, Integer> processFunction =
new ProcessFunction<Long, Integer>() {
@@ -927,7 +927,7 @@ public class DataStreamTest extends TestLogger {
@Test
public void testKeyedStreamKeyedProcessTranslation() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> src = env.generateSequence(0, 0);
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
new KeyedProcessFunction<Long, Long, Integer>() {
@@ -962,7 +962,7 @@ public class DataStreamTest extends TestLogger {
@Test
public void testProcessTranslation() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> src = env.generateSequence(0, 0);
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
ProcessFunction<Long, Integer> processFunction =
new ProcessFunction<Long, Integer>() {
@@ -1002,7 +1002,7 @@ public class DataStreamTest extends TestLogger {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<Long> srcOne =
- env.generateSequence(0L, 5L)
+ env.fromSequence(0L, 5L)
.assignTimestampsAndWatermarks(
new CustomWmEmitter<Long>() {
@@ -1056,7 +1056,7 @@ public class DataStreamTest extends TestLogger {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<Long> srcOne =
- env.generateSequence(0L, 5L)
+ env.fromSequence(0L, 5L)
.assignTimestampsAndWatermarks(
new CustomWmEmitter<Long>() {
@@ -1105,7 +1105,7 @@ public class DataStreamTest extends TestLogger {
// global window
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> dataStream1 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.reduce(
@@ -1128,7 +1128,7 @@ public class DataStreamTest extends TestLogger {
// keyed window
DataStream<Long> dataStream2 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.keyBy(value -> value)
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
.trigger(PurgingTrigger.of(CountTrigger.of(10)))
@@ -1159,7 +1159,7 @@ public class DataStreamTest extends TestLogger {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> dataStream1 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.name("testSource1")
.setDescription("this is test source 1")
.map(
@@ -1173,7 +1173,7 @@ public class DataStreamTest extends TestLogger {
.setDescription("this is test map 1");
DataStream<Long> dataStream2 =
- env.generateSequence(0, 0)
+ env.fromSequence(0, 0)
.name("testSource2")
.setDescription("this is test source 2")
.map(
@@ -1241,7 +1241,7 @@ public class DataStreamTest extends TestLogger {
public void operatorTest() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> src = env.generateSequence(0, 0);
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
MapFunction<Long, Integer> mapFunction =
new MapFunction<Long, Integer>() {
@@ -1343,7 +1343,7 @@ public class DataStreamTest extends TestLogger {
public void sinkKeyTest() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
+ DataStreamSink<Long> sink = env.fromSequence(1, 100).print();
assertEquals(
0,
getStreamGraph(env)
@@ -1369,7 +1369,7 @@ public class DataStreamTest extends TestLogger {
}
};
- DataStreamSink<Long> sink2 = env.generateSequence(1,
100).keyBy(key1).print();
+ DataStreamSink<Long> sink2 = env.fromSequence(1,
100).keyBy(key1).print();
assertEquals(
1,
@@ -1409,7 +1409,7 @@ public class DataStreamTest extends TestLogger {
}
};
- DataStreamSink<Long> sink3 = env.generateSequence(1,
100).keyBy(key2).print();
+ DataStreamSink<Long> sink3 = env.fromSequence(1,
100).keyBy(key2).print();
assertEquals(
1,
@@ -1435,7 +1435,7 @@ public class DataStreamTest extends TestLogger {
public void testChannelSelectors() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Long> src = env.generateSequence(0, 0);
+ DataStreamSource<Long> src = env.fromSequence(0, 0);
DataStream<Long> broadcast = src.broadcast();
DataStreamSink<Long> broadcastSink = broadcast.print();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index cdbf30ffcf4..a6d1cb1ced0 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -55,7 +55,7 @@ public class TypeFillTest {
} catch (Exception ignored) {
}
- DataStream<Long> source = env.generateSequence(1, 10);
+ DataStream<Long> source = env.fromSequence(1, 10);
try {
source.map(new TestMap<Long, Long>()).print();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index b2be45482fb..bf16870a8e4 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -43,7 +44,6 @@ import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -181,8 +181,9 @@ class StreamExecutionEnvironmentTest {
List<Long> list = Arrays.asList(0L, 1L, 2L);
- DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-
assertThat(getFunctionFromDataSource(src2)).isInstanceOf(StatefulSequenceSource.class);
+ DataStreamSource<Long> src2 = env.fromSequence(0, 2);
+ Object generatorSource = getSourceFromStream(src2);
+ assertThat(generatorSource).isInstanceOf(NumberSequenceSource.class);
DataStreamSource<Long> src3 = env.fromData(0L, 1L, 2L);
assertThat(getSourceFromDataSourceTyped(src3)).isInstanceOf(DataGeneratorSource.class);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 5ada1f53518..84a20f50e7c 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -53,7 +53,7 @@ public class SlotAllocationTest extends TestLogger {
}
};
- env.generateSequence(1, 10)
+ env.fromSequence(1, 10)
.filter(dummyFilter)
.slotSharingGroup("isolated")
.filter(dummyFilter)
@@ -67,7 +67,7 @@ public class SlotAllocationTest extends TestLogger {
.disableChaining();
// verify that a second pipeline does not inherit the groups from the
first pipeline
- env.generateSequence(1, 10)
+ env.fromSequence(1, 10)
.filter(dummyFilter)
.slotSharingGroup("isolated-2")
.filter(dummyFilter)
@@ -122,14 +122,14 @@ public class SlotAllocationTest extends TestLogger {
}
};
- DataStream<Long> src1 = env.generateSequence(1, 10);
- DataStream<Long> src2 = env.generateSequence(1,
10).slotSharingGroup("src-1");
+ DataStream<Long> src1 = env.fromSequence(1, 10);
+ DataStream<Long> src2 = env.fromSequence(1,
10).slotSharingGroup("src-1");
// this should not inherit group "src-1"
src1.union(src2).filter(dummyFilter);
- DataStream<Long> src3 = env.generateSequence(1,
10).slotSharingGroup("group-1");
- DataStream<Long> src4 = env.generateSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src3 = env.fromSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src4 = env.fromSequence(1,
10).slotSharingGroup("group-1");
// this should inherit "group-1" now
src3.union(src4).filter(dummyFilter);
@@ -165,8 +165,8 @@ public class SlotAllocationTest extends TestLogger {
}
};
- DataStream<Long> src1 = env.generateSequence(1,
10).slotSharingGroup("group-1");
- DataStream<Long> src2 = env.generateSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src1 = env.fromSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src2 = env.fromSequence(1,
10).slotSharingGroup("group-1");
// this should not inherit group but be in "default"
src1.union(src2).filter(dummyFilter).slotSharingGroup("default");
@@ -198,14 +198,14 @@ public class SlotAllocationTest extends TestLogger {
}
};
- DataStream<Long> src1 = env.generateSequence(1, 10);
- DataStream<Long> src2 = env.generateSequence(1,
10).slotSharingGroup("src-1");
+ DataStream<Long> src1 = env.fromSequence(1, 10);
+ DataStream<Long> src2 = env.fromSequence(1,
10).slotSharingGroup("src-1");
// this should not inherit group "src-1"
src1.connect(src2).map(dummyCoMap);
- DataStream<Long> src3 = env.generateSequence(1,
10).slotSharingGroup("group-1");
- DataStream<Long> src4 = env.generateSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src3 = env.fromSequence(1,
10).slotSharingGroup("group-1");
+ DataStream<Long> src4 = env.fromSequence(1,
10).slotSharingGroup("group-1");
// this should inherit "group-1" now
src3.connect(src4).map(dummyCoMap);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
index 80745407451..0534a30ed89 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
@@ -42,7 +42,7 @@ public class StreamGraphCoLocationConstraintTest {
env.setParallelism(7);
// set up the test program
- DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+ DataStream<Long> source = env.fromSequence(1L, 10_000_000);
source.getTransformation().setCoLocationGroupKey("group1");
DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
@@ -73,7 +73,7 @@ public class StreamGraphCoLocationConstraintTest {
env.setParallelism(7);
// set up the test program
- DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+ DataStream<Long> source = env.fromSequence(1L, 10_000_000);
source.getTransformation().setSlotSharingGroup("ssg1");
source.getTransformation().setCoLocationGroupKey("co1");
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
index 6fa1e14aa09..22ebebdcd8a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -70,7 +70,7 @@ public class TranslationTest {
private static StreamExecutionEnvironment getSimpleJob() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.generateSequence(1, 10000000)
+ env.fromSequence(1, 10000000)
.addSink(
new SinkFunction<Long>() {
@Override
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 38fad1c1be9..d6c847f1562 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -54,7 +54,7 @@ class BroadcastStateITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val srcOne = env
- .generateSequence(0L, 5L)
+ .fromSequence(0L, 5L)
.assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks[Long]() {
override def extractTimestamp(element: Long, previousElementTimestamp:
Long): Long =
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index f090b6d7f26..900bd8c99eb 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -51,7 +51,7 @@ class DataStreamTest extends AbstractTestBase {
def testNaming(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val source1Operator = env.generateSequence(0, 0).name("testSource1")
+ val source1Operator = env.fromSequence(0, 0).name("testSource1")
val source1 = source1Operator
assert("testSource1" == source1Operator.getName)
@@ -61,7 +61,7 @@ class DataStreamTest extends AbstractTestBase {
assert("testMap" == dataStream1.getName)
val dataStream2 = env
- .generateSequence(0, 0)
+ .fromSequence(0, 0)
.name("testSource2")
.keyBy(x => x)
.reduce((x, y) => 0L)
@@ -103,12 +103,12 @@ class DataStreamTest extends AbstractTestBase {
def testUserDefinedDescription(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream1 = env
- .generateSequence(0, 0)
+ .fromSequence(0, 0)
.setDescription("this is test source 1")
.map(x => x)
.setDescription("this is test map 1")
val dataStream2 = env
- .generateSequence(0, 0)
+ .fromSequence(0, 0)
.setDescription("this is test source 2")
.map(x => x)
.setDescription("this is test map 2")
@@ -327,7 +327,7 @@ class DataStreamTest extends AbstractTestBase {
.getStreamNode(sink.getTransformation.getId)
.getParallelism)
- val parallelSource = env.generateSequence(0, 0)
+ val parallelSource = env.fromSequence(0, 0)
parallelSource.print()
assert(newParallelism ==
getStreamGraph(env).getStreamNode(parallelSource.getId).getParallelism)
@@ -439,7 +439,7 @@ class DataStreamTest extends AbstractTestBase {
def testTypeInfo() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src1: DataStream[Long] = env.generateSequence(0, 0)
+ val src1: DataStream[Long] = env.fromSequence(0, 0)
assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
val map: DataStream[(Integer, String)] = src1.map(x => null)
@@ -476,7 +476,7 @@ class DataStreamTest extends AbstractTestBase {
def testKeyedStreamProcessTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src = env.generateSequence(0, 0)
+ val src = env.fromSequence(0, 0)
val processFunction = new ProcessFunction[Long, Int] {
override def processElement(
@@ -499,7 +499,7 @@ class DataStreamTest extends AbstractTestBase {
def testKeyedStreamKeyedProcessTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src = env.generateSequence(0, 0)
+ val src = env.fromSequence(0, 0)
val keyedProcessFunction = new KeyedProcessFunction[Long, Long, Int] {
override def processElement(
@@ -522,7 +522,7 @@ class DataStreamTest extends AbstractTestBase {
def testProcessTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src = env.generateSequence(0, 0)
+ val src = env.fromSequence(0, 0)
val processFunction = new ProcessFunction[Long, Int] {
override def processElement(
@@ -540,7 +540,7 @@ class DataStreamTest extends AbstractTestBase {
@Test def operatorTest() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src = env.generateSequence(0, 0)
+ val src = env.fromSequence(0, 0)
val mapFunction = new MapFunction[Long, Int] {
override def map(value: Long): Int = 0
@@ -632,7 +632,7 @@ class DataStreamTest extends AbstractTestBase {
def testChannelSelectors() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val src = env.generateSequence(0, 0)
+ val src = env.fromSequence(0, 0)
val broadcast = src.broadcast
val broadcastSink = broadcast.print()
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
index 38ff055760a..e1f46b20b68 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
@@ -41,7 +41,7 @@ class SlotAllocationTest {
}
env
- .generateSequence(1, 10)
+ .fromSequence(1, 10)
.filter(dummyFilter)
.slotSharingGroup("isolated")
.filter(dummyFilter)
@@ -56,7 +56,7 @@ class SlotAllocationTest {
// verify that a second pipeline does not inherit the groups from the
first pipeline
env
- .generateSequence(1, 10)
+ .fromSequence(1, 10)
.filter(dummyFilter)
.slotSharingGroup("isolated-2")
.filter(dummyFilter)
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index d1ba6ea22f3..b56177261f5 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -29,7 +29,7 @@ object StateTestPrograms {
// test stateful map
env
- .generateSequence(0, 10)
+ .fromSequence(0, 10)
.setParallelism(1)
.map(v => (1, v))
.setParallelism(1)
@@ -77,7 +77,7 @@ object StateTestPrograms {
// test stateful filter
env
- .generateSequence(1, 10)
+ .fromSequence(1, 10)
.keyBy(_ % 2)
.filterWithState(
(in, state: Option[Int]) =>
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
index b33ad6604ec..c170d2d47c5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -62,7 +62,7 @@ public class LocalStreamEnvironmentITCase extends TestLogger {
// ------------------------------------------------------------------------
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int
parallelism) {
- DataStream<Long> stream = env.generateSequence(1,
100).setParallelism(parallelism);
+ DataStream<Long> stream = env.fromSequence(1,
100).setParallelism(parallelism);
stream.filter(ignored -> false)
.setParallelism(parallelism)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
index ae6acd14f22..d0d89d32820 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -43,7 +43,7 @@ public class CollectITCase extends AbstractTestBase {
env.setParallelism(1);
final long n = 10;
- DataStream<Long> stream = env.generateSequence(1, n);
+ DataStream<Long> stream = env.fromSequence(1, n);
long i = 1;
for (Iterator<Long> it = DataStreamUtils.collect(stream);
it.hasNext(); ) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index e9822515434..ef54495dcfe 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -69,7 +69,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<Long> srcOne =
- env.generateSequence(0L, 5L)
+ env.fromSequence(0L, 5L)
.assignTimestampsAndWatermarks(
new CustomWmEmitter<Long>() {
@@ -130,7 +130,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<Long> srcOne =
- env.generateSequence(0L, 5L)
+ env.fromSequence(0L, 5L)
.assignTimestampsAndWatermarks(
new CustomWmEmitter<Long>() {