This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e74592ca92f [FLINK-33755] Cleanup usage of deprecated 
StreamExecutionEnvironment#generateSequence
e74592ca92f is described below

commit e74592ca92f4eac5bef6e5140ae4e8cc2f0bf1a1
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Dec 5 17:33:29 2023 +0100

    [FLINK-33755] Cleanup usage of deprecated 
StreamExecutionEnvironment#generateSequence
---
 docs/content.zh/docs/dev/datastream/overview.md    |  4 +-
 docs/content/docs/dev/datastream/overview.md       |  4 +-
 .../apache/flink/streaming/api/DataStreamTest.java | 52 +++++++++++-----------
 .../apache/flink/streaming/api/TypeFillTest.java   |  2 +-
 .../StreamExecutionEnvironmentTest.java            |  7 +--
 .../streaming/api/graph/SlotAllocationTest.java    | 24 +++++-----
 .../graph/StreamGraphCoLocationConstraintTest.java |  4 +-
 .../flink/streaming/graph/TranslationTest.java     |  2 +-
 .../streaming/api/scala/BroadcastStateITCase.scala |  2 +-
 .../flink/streaming/api/scala/DataStreamTest.scala | 22 ++++-----
 .../streaming/api/scala/SlotAllocationTest.scala   |  4 +-
 .../streaming/api/scala/StateTestPrograms.scala    |  4 +-
 .../environment/LocalStreamEnvironmentITCase.java  |  2 +-
 .../test/streaming/experimental/CollectITCase.java |  2 +-
 .../streaming/runtime/BroadcastStateITCase.java    |  4 +-
 15 files changed, 70 insertions(+), 69 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/overview.md 
b/docs/content.zh/docs/dev/datastream/overview.md
index 9544803692e..75c05d5dd22 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -313,7 +313,7 @@ Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecu
   
 - `fromParallelCollection(SplittableIterator, Class)` - 从迭代器并行创建数据流。class 
参数指定迭代器返回元素的数据类型。
   
-- `generateSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
+- `fromSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
 
 自定义:
 
@@ -357,7 +357,7 @@ Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecu
   
 - `fromParallelCollection(SplittableIterator, Class)` - 从迭代器并行创建数据流。class 
参数指定迭代器返回元素的数据类型。
   
-- `generateSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
+- `fromSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。
 
 自定义:
 
diff --git a/docs/content/docs/dev/datastream/overview.md 
b/docs/content/docs/dev/datastream/overview.md
index b72fcf7c871..0034cbd17d9 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -386,7 +386,7 @@ Collection-based:
 - `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream 
from an iterator, in
   parallel. The class specifies the data type of the elements returned by the 
iterator.
 
-- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
+- `fromSequence(from, to)` - Generates the sequence of numbers in the given 
interval, in
   parallel.
 
 Custom:
@@ -441,7 +441,7 @@ Collection-based:
 - `fromParallelCollection(SplittableIterator)` - Creates a data stream from an 
iterator, in
   parallel. The class specifies the data type of the elements returned by the 
iterator.
 
-- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
+- `fromSequence(from, to)` - Generates the sequence of numbers in the given 
interval, in
   parallel.
 
 Custom:
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 936d853c010..6b75b1d2ee7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -140,7 +140,7 @@ public class DataStreamTest extends TestLogger {
         env.setParallelism(4);
 
         DataStream<Long> input1 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -160,7 +160,7 @@ public class DataStreamTest extends TestLogger {
                                 });
 
         DataStream<Long> input6 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -181,7 +181,7 @@ public class DataStreamTest extends TestLogger {
                                 });
 
         DataStream<Long> input2 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -192,7 +192,7 @@ public class DataStreamTest extends TestLogger {
                         .setParallelism(4);
 
         DataStream<Long> input3 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -214,7 +214,7 @@ public class DataStreamTest extends TestLogger {
                         .setParallelism(4);
 
         DataStream<Long> input4 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -225,7 +225,7 @@ public class DataStreamTest extends TestLogger {
                         .setParallelism(2);
 
         DataStream<Long> input5 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .map(
                                 new MapFunction<Long, Long>() {
                                     @Override
@@ -313,7 +313,7 @@ public class DataStreamTest extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Long> dataStream1 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .name("testSource1")
                         .map(
                                 new MapFunction<Long, Long>() {
@@ -325,7 +325,7 @@ public class DataStreamTest extends TestLogger {
                         .name("testMap");
 
         DataStream<Long> dataStream2 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .name("testSource2")
                         .map(
                                 new MapFunction<Long, Long>() {
@@ -657,7 +657,7 @@ public class DataStreamTest extends TestLogger {
                         .getStreamNode(sink.getTransformation().getId())
                         .getParallelism());
 
-        DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
+        DataStreamSource<Long> parallelSource = env.fromSequence(0, 0);
         parallelSource.sinkTo(new DiscardingSink<Long>());
         assertEquals(7, 
getStreamGraph(env).getStreamNode(parallelSource.getId()).getParallelism());
 
@@ -711,7 +711,7 @@ public class DataStreamTest extends TestLogger {
                         "setResources", ResourceSpec.class, 
ResourceSpec.class);
         sinkMethod.setAccessible(true);
 
-        DataStream<Long> source1 = env.generateSequence(0, 0);
+        DataStream<Long> source1 = env.fromSequence(0, 0);
         opMethod.invoke(source1, minResource1, preferredResource1);
 
         DataStream<Long> map1 =
@@ -724,7 +724,7 @@ public class DataStreamTest extends TestLogger {
                         });
         opMethod.invoke(map1, minResource2, preferredResource2);
 
-        DataStream<Long> source2 = env.generateSequence(0, 0);
+        DataStream<Long> source2 = env.fromSequence(0, 0);
         opMethod.invoke(source2, minResource3, preferredResource3);
 
         DataStream<Long> map2 =
@@ -823,7 +823,7 @@ public class DataStreamTest extends TestLogger {
     public void testTypeInfo() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStream<Long> src1 = env.generateSequence(0, 0);
+        DataStream<Long> src1 = env.fromSequence(0, 0);
         assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
 
         DataStream<Tuple2<Integer, String>> map =
@@ -892,7 +892,7 @@ public class DataStreamTest extends TestLogger {
     @Deprecated
     public void testKeyedStreamProcessTranslation() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStreamSource<Long> src = env.generateSequence(0, 0);
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
 
         ProcessFunction<Long, Integer> processFunction =
                 new ProcessFunction<Long, Integer>() {
@@ -927,7 +927,7 @@ public class DataStreamTest extends TestLogger {
     @Test
     public void testKeyedStreamKeyedProcessTranslation() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStreamSource<Long> src = env.generateSequence(0, 0);
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
 
         KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction =
                 new KeyedProcessFunction<Long, Long, Integer>() {
@@ -962,7 +962,7 @@ public class DataStreamTest extends TestLogger {
     @Test
     public void testProcessTranslation() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStreamSource<Long> src = env.generateSequence(0, 0);
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
 
         ProcessFunction<Long, Integer> processFunction =
                 new ProcessFunction<Long, Integer>() {
@@ -1002,7 +1002,7 @@ public class DataStreamTest extends TestLogger {
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStream<Long> srcOne =
-                env.generateSequence(0L, 5L)
+                env.fromSequence(0L, 5L)
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<Long>() {
 
@@ -1056,7 +1056,7 @@ public class DataStreamTest extends TestLogger {
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStream<Long> srcOne =
-                env.generateSequence(0L, 5L)
+                env.fromSequence(0L, 5L)
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<Long>() {
 
@@ -1105,7 +1105,7 @@ public class DataStreamTest extends TestLogger {
         // global window
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Long> dataStream1 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .windowAll(GlobalWindows.create())
                         .trigger(PurgingTrigger.of(CountTrigger.of(10)))
                         .reduce(
@@ -1128,7 +1128,7 @@ public class DataStreamTest extends TestLogger {
 
         // keyed window
         DataStream<Long> dataStream2 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .keyBy(value -> value)
                         
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
                         .trigger(PurgingTrigger.of(CountTrigger.of(10)))
@@ -1159,7 +1159,7 @@ public class DataStreamTest extends TestLogger {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Long> dataStream1 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .name("testSource1")
                         .setDescription("this is test source 1")
                         .map(
@@ -1173,7 +1173,7 @@ public class DataStreamTest extends TestLogger {
                         .setDescription("this is test map 1");
 
         DataStream<Long> dataStream2 =
-                env.generateSequence(0, 0)
+                env.fromSequence(0, 0)
                         .name("testSource2")
                         .setDescription("this is test source 2")
                         .map(
@@ -1241,7 +1241,7 @@ public class DataStreamTest extends TestLogger {
     public void operatorTest() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStreamSource<Long> src = env.generateSequence(0, 0);
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
 
         MapFunction<Long, Integer> mapFunction =
                 new MapFunction<Long, Integer>() {
@@ -1343,7 +1343,7 @@ public class DataStreamTest extends TestLogger {
     public void sinkKeyTest() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
+        DataStreamSink<Long> sink = env.fromSequence(1, 100).print();
         assertEquals(
                 0,
                 getStreamGraph(env)
@@ -1369,7 +1369,7 @@ public class DataStreamTest extends TestLogger {
                     }
                 };
 
-        DataStreamSink<Long> sink2 = env.generateSequence(1, 
100).keyBy(key1).print();
+        DataStreamSink<Long> sink2 = env.fromSequence(1, 
100).keyBy(key1).print();
 
         assertEquals(
                 1,
@@ -1409,7 +1409,7 @@ public class DataStreamTest extends TestLogger {
                     }
                 };
 
-        DataStreamSink<Long> sink3 = env.generateSequence(1, 
100).keyBy(key2).print();
+        DataStreamSink<Long> sink3 = env.fromSequence(1, 
100).keyBy(key2).print();
 
         assertEquals(
                 1,
@@ -1435,7 +1435,7 @@ public class DataStreamTest extends TestLogger {
     public void testChannelSelectors() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        DataStreamSource<Long> src = env.generateSequence(0, 0);
+        DataStreamSource<Long> src = env.fromSequence(0, 0);
 
         DataStream<Long> broadcast = src.broadcast();
         DataStreamSink<Long> broadcastSink = broadcast.print();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index cdbf30ffcf4..a6d1cb1ced0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -55,7 +55,7 @@ public class TypeFillTest {
         } catch (Exception ignored) {
         }
 
-        DataStream<Long> source = env.generateSequence(1, 10);
+        DataStream<Long> source = env.fromSequence(1, 10);
 
         try {
             source.map(new TestMap<Long, Long>()).print();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index b2be45482fb..bf16870a8e4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -43,7 +44,6 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -181,8 +181,9 @@ class StreamExecutionEnvironmentTest {
 
         List<Long> list = Arrays.asList(0L, 1L, 2L);
 
-        DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-        
assertThat(getFunctionFromDataSource(src2)).isInstanceOf(StatefulSequenceSource.class);
+        DataStreamSource<Long> src2 = env.fromSequence(0, 2);
+        Object generatorSource = getSourceFromStream(src2);
+        assertThat(generatorSource).isInstanceOf(NumberSequenceSource.class);
 
         DataStreamSource<Long> src3 = env.fromData(0L, 1L, 2L);
         
assertThat(getSourceFromDataSourceTyped(src3)).isInstanceOf(DataGeneratorSource.class);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 5ada1f53518..84a20f50e7c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -53,7 +53,7 @@ public class SlotAllocationTest extends TestLogger {
                     }
                 };
 
-        env.generateSequence(1, 10)
+        env.fromSequence(1, 10)
                 .filter(dummyFilter)
                 .slotSharingGroup("isolated")
                 .filter(dummyFilter)
@@ -67,7 +67,7 @@ public class SlotAllocationTest extends TestLogger {
                 .disableChaining();
 
         // verify that a second pipeline does not inherit the groups from the 
first pipeline
-        env.generateSequence(1, 10)
+        env.fromSequence(1, 10)
                 .filter(dummyFilter)
                 .slotSharingGroup("isolated-2")
                 .filter(dummyFilter)
@@ -122,14 +122,14 @@ public class SlotAllocationTest extends TestLogger {
                     }
                 };
 
-        DataStream<Long> src1 = env.generateSequence(1, 10);
-        DataStream<Long> src2 = env.generateSequence(1, 
10).slotSharingGroup("src-1");
+        DataStream<Long> src1 = env.fromSequence(1, 10);
+        DataStream<Long> src2 = env.fromSequence(1, 
10).slotSharingGroup("src-1");
 
         // this should not inherit group "src-1"
         src1.union(src2).filter(dummyFilter);
 
-        DataStream<Long> src3 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
-        DataStream<Long> src4 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src3 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src4 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
 
         // this should inherit "group-1" now
         src3.union(src4).filter(dummyFilter);
@@ -165,8 +165,8 @@ public class SlotAllocationTest extends TestLogger {
                     }
                 };
 
-        DataStream<Long> src1 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
-        DataStream<Long> src2 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src1 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src2 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
 
         // this should not inherit group but be in "default"
         src1.union(src2).filter(dummyFilter).slotSharingGroup("default");
@@ -198,14 +198,14 @@ public class SlotAllocationTest extends TestLogger {
                     }
                 };
 
-        DataStream<Long> src1 = env.generateSequence(1, 10);
-        DataStream<Long> src2 = env.generateSequence(1, 
10).slotSharingGroup("src-1");
+        DataStream<Long> src1 = env.fromSequence(1, 10);
+        DataStream<Long> src2 = env.fromSequence(1, 
10).slotSharingGroup("src-1");
 
         // this should not inherit group "src-1"
         src1.connect(src2).map(dummyCoMap);
 
-        DataStream<Long> src3 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
-        DataStream<Long> src4 = env.generateSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src3 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
+        DataStream<Long> src4 = env.fromSequence(1, 
10).slotSharingGroup("group-1");
 
         // this should inherit "group-1" now
         src3.connect(src4).map(dummyCoMap);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
index 80745407451..0534a30ed89 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamGraphCoLocationConstraintTest.java
@@ -42,7 +42,7 @@ public class StreamGraphCoLocationConstraintTest {
         env.setParallelism(7);
 
         // set up the test program
-        DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+        DataStream<Long> source = env.fromSequence(1L, 10_000_000);
         source.getTransformation().setCoLocationGroupKey("group1");
 
         DataStream<Long> step1 = source.keyBy(v -> v).map(v -> v);
@@ -73,7 +73,7 @@ public class StreamGraphCoLocationConstraintTest {
         env.setParallelism(7);
 
         // set up the test program
-        DataStream<Long> source = env.generateSequence(1L, 10_000_000);
+        DataStream<Long> source = env.fromSequence(1L, 10_000_000);
         source.getTransformation().setSlotSharingGroup("ssg1");
         source.getTransformation().setCoLocationGroupKey("co1");
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
index 6fa1e14aa09..22ebebdcd8a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -70,7 +70,7 @@ public class TranslationTest {
 
     private static StreamExecutionEnvironment getSimpleJob() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.generateSequence(1, 10000000)
+        env.fromSequence(1, 10000000)
                 .addSink(
                         new SinkFunction<Long>() {
                             @Override
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 38fad1c1be9..d6c847f1562 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -54,7 +54,7 @@ class BroadcastStateITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val srcOne = env
-      .generateSequence(0L, 5L)
+      .fromSequence(0L, 5L)
       .assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks[Long]() {
 
         override def extractTimestamp(element: Long, previousElementTimestamp: 
Long): Long =
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index f090b6d7f26..900bd8c99eb 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -51,7 +51,7 @@ class DataStreamTest extends AbstractTestBase {
   def testNaming(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val source1Operator = env.generateSequence(0, 0).name("testSource1")
+    val source1Operator = env.fromSequence(0, 0).name("testSource1")
     val source1 = source1Operator
     assert("testSource1" == source1Operator.getName)
 
@@ -61,7 +61,7 @@ class DataStreamTest extends AbstractTestBase {
     assert("testMap" == dataStream1.getName)
 
     val dataStream2 = env
-      .generateSequence(0, 0)
+      .fromSequence(0, 0)
       .name("testSource2")
       .keyBy(x => x)
       .reduce((x, y) => 0L)
@@ -103,12 +103,12 @@ class DataStreamTest extends AbstractTestBase {
   def testUserDefinedDescription(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val dataStream1 = env
-      .generateSequence(0, 0)
+      .fromSequence(0, 0)
       .setDescription("this is test source 1")
       .map(x => x)
       .setDescription("this is test map 1")
     val dataStream2 = env
-      .generateSequence(0, 0)
+      .fromSequence(0, 0)
       .setDescription("this is test source 2")
       .map(x => x)
       .setDescription("this is test map 2")
@@ -327,7 +327,7 @@ class DataStreamTest extends AbstractTestBase {
         .getStreamNode(sink.getTransformation.getId)
         .getParallelism)
 
-    val parallelSource = env.generateSequence(0, 0)
+    val parallelSource = env.fromSequence(0, 0)
     parallelSource.print()
 
     assert(newParallelism == 
getStreamGraph(env).getStreamNode(parallelSource.getId).getParallelism)
@@ -439,7 +439,7 @@ class DataStreamTest extends AbstractTestBase {
   def testTypeInfo() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src1: DataStream[Long] = env.generateSequence(0, 0)
+    val src1: DataStream[Long] = env.fromSequence(0, 0)
     assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
 
     val map: DataStream[(Integer, String)] = src1.map(x => null)
@@ -476,7 +476,7 @@ class DataStreamTest extends AbstractTestBase {
   def testKeyedStreamProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src = env.generateSequence(0, 0)
+    val src = env.fromSequence(0, 0)
 
     val processFunction = new ProcessFunction[Long, Int] {
       override def processElement(
@@ -499,7 +499,7 @@ class DataStreamTest extends AbstractTestBase {
   def testKeyedStreamKeyedProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src = env.generateSequence(0, 0)
+    val src = env.fromSequence(0, 0)
 
     val keyedProcessFunction = new KeyedProcessFunction[Long, Long, Int] {
       override def processElement(
@@ -522,7 +522,7 @@ class DataStreamTest extends AbstractTestBase {
   def testProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src = env.generateSequence(0, 0)
+    val src = env.fromSequence(0, 0)
 
     val processFunction = new ProcessFunction[Long, Int] {
       override def processElement(
@@ -540,7 +540,7 @@ class DataStreamTest extends AbstractTestBase {
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src = env.generateSequence(0, 0)
+    val src = env.fromSequence(0, 0)
 
     val mapFunction = new MapFunction[Long, Int] {
       override def map(value: Long): Int = 0
@@ -632,7 +632,7 @@ class DataStreamTest extends AbstractTestBase {
   def testChannelSelectors() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    val src = env.generateSequence(0, 0)
+    val src = env.fromSequence(0, 0)
 
     val broadcast = src.broadcast
     val broadcastSink = broadcast.print()
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
index 38ff055760a..e1f46b20b68 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SlotAllocationTest.scala
@@ -41,7 +41,7 @@ class SlotAllocationTest {
     }
 
     env
-      .generateSequence(1, 10)
+      .fromSequence(1, 10)
       .filter(dummyFilter)
       .slotSharingGroup("isolated")
       .filter(dummyFilter)
@@ -56,7 +56,7 @@ class SlotAllocationTest {
 
     // verify that a second pipeline does not inherit the groups from the 
first pipeline
     env
-      .generateSequence(1, 10)
+      .fromSequence(1, 10)
       .filter(dummyFilter)
       .slotSharingGroup("isolated-2")
       .filter(dummyFilter)
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index d1ba6ea22f3..b56177261f5 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -29,7 +29,7 @@ object StateTestPrograms {
 
     // test stateful map
     env
-      .generateSequence(0, 10)
+      .fromSequence(0, 10)
       .setParallelism(1)
       .map(v => (1, v))
       .setParallelism(1)
@@ -77,7 +77,7 @@ object StateTestPrograms {
 
     // test stateful filter
     env
-      .generateSequence(1, 10)
+      .fromSequence(1, 10)
       .keyBy(_ % 2)
       .filterWithState(
         (in, state: Option[Int]) =>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
index b33ad6604ec..c170d2d47c5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -62,7 +62,7 @@ public class LocalStreamEnvironmentITCase extends TestLogger {
     // ------------------------------------------------------------------------
 
     private static void addSmallBoundedJob(StreamExecutionEnvironment env, int 
parallelism) {
-        DataStream<Long> stream = env.generateSequence(1, 
100).setParallelism(parallelism);
+        DataStream<Long> stream = env.fromSequence(1, 
100).setParallelism(parallelism);
 
         stream.filter(ignored -> false)
                 .setParallelism(parallelism)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
index ae6acd14f22..d0d89d32820 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -43,7 +43,7 @@ public class CollectITCase extends AbstractTestBase {
         env.setParallelism(1);
 
         final long n = 10;
-        DataStream<Long> stream = env.generateSequence(1, n);
+        DataStream<Long> stream = env.fromSequence(1, n);
 
         long i = 1;
         for (Iterator<Long> it = DataStreamUtils.collect(stream); 
it.hasNext(); ) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index e9822515434..ef54495dcfe 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -69,7 +69,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         final DataStream<Long> srcOne =
-                env.generateSequence(0L, 5L)
+                env.fromSequence(0L, 5L)
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<Long>() {
 
@@ -130,7 +130,7 @@ public class BroadcastStateITCase extends AbstractTestBase {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         final DataStream<Long> srcOne =
-                env.generateSequence(0L, 5L)
+                env.fromSequence(0L, 5L)
                         .assignTimestampsAndWatermarks(
                                 new CustomWmEmitter<Long>() {
 

Reply via email to