This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 369d4a6ce46dddfd2a14ad66ac2e189bd4827158 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Wed Jun 24 21:22:18 2020 +0800 [FLINK-18428][API/DataStream] Rename StreamExecutionEnvironment#continuousSource() to StreamExecutionEnvironment#source(). This closes #12766 --- docs/dev/stream/sources.md | 6 +++--- docs/dev/stream/sources.zh.md | 6 +++--- .../flink/connector/base/source/reader/CoordinatedSourceITCase.java | 6 +++--- .../tests/test_stream_execution_environment_completeness.py | 2 +- .../flink/streaming/api/environment/StreamExecutionEnvironment.java | 6 +++--- .../flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java | 4 ++-- .../flink/streaming/api/scala/StreamExecutionEnvironment.scala | 4 ++-- .../flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md index 669ca8f..3c3db90 100644 --- a/docs/dev/stream/sources.md +++ b/docs/dev/stream/sources.md @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn Source mySource = new MySource(...); -DataStream<Integer> stream = env.continuousSource( +DataStream<Integer> stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val mySource = new MySource(...) -val stream = env.continuousSource( +val stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName") @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java). {% highlight java %} -environment.continuousSource( +environment.fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName) diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md index 3f20388..a063ecb 100644 --- a/docs/dev/stream/sources.zh.md +++ b/docs/dev/stream/sources.zh.md @@ -187,7 +187,7 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn Source mySource = new MySource(...); -DataStream<Integer> stream = env.continuousSource( +DataStream<Integer> stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); @@ -200,7 +200,7 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val mySource = new MySource(...) -val stream = env.continuousSource( +val stream = env.fromSource( mySource, WatermarkStrategy.noWatermarks(), "MySourceName") @@ -352,7 +352,7 @@ Apparently, the `SourceReader` implementations can also implement their own thre The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the [TimestampAssigner](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java) and [WatermarkGenerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java). {% highlight java %} -environment.continuousSource( +environment.fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java index 6582210..3280c38 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -45,7 +45,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase { public void testEnumeratorReaderCommunication() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED); - DataStream<Integer> stream = env.continuousSource( + DataStream<Integer> stream = env.fromSource( source, WatermarkStrategy.noWatermarks(), "TestingSource"); @@ -57,11 +57,11 @@ public class CoordinatedSourceITCase extends AbstractTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED); MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); - DataStream<Integer> stream1 = env.continuousSource( + DataStream<Integer> stream1 = env.fromSource( source1, WatermarkStrategy.noWatermarks(), "TestingSource1"); - DataStream<Integer> stream2 = env.continuousSource( + DataStream<Integer> stream2 = env.fromSource( source2, WatermarkStrategy.noWatermarks(), "TestingSource2"); diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index c91e086..9764cb4 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners', "continuousSource"} + 'clearJobListeners', 'getJobListeners', "fromSource"} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 7de2e97..59837ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1629,11 +1629,11 @@ public class StreamExecutionEnvironment { * @return the data stream constructed */ @Experimental - public <OUT> DataStreamSource<OUT> continuousSource( + public <OUT> DataStreamSource<OUT> fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName) { - return continuousSource(source, timestampsAndWatermarks, sourceName, null); + return fromSource(source, timestampsAndWatermarks, sourceName, null); } /** @@ -1650,7 +1650,7 @@ public class StreamExecutionEnvironment { * @return the data stream constructed */ @Experimental - public <OUT> DataStreamSource<OUT> continuousSource( + public <OUT> DataStreamSource<OUT> fromSource( Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 42edc70..c947325 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -283,7 +283,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testOperatorCoordinatorAddedToJobVertex() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Integer> stream = env.continuousSource( + DataStream<Integer> stream = env.fromSource( new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestingSource"); @@ -493,7 +493,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testCoordinatedOperator() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Integer> source = env.continuousSource( + DataStream<Integer> source = env.fromSource( new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestSource"); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 925d571..9ab3acf 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -666,13 +666,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * Create a DataStream using a [[Source]]. */ @Experimental - def continuousSource[T: TypeInformation]( + def fromSource[T: TypeInformation]( source: Source[T, _ <: SourceSplit, _], watermarkStrategy: WatermarkStrategy[T], sourceName: String): DataStream[T] = { val typeInfo = implicitly[TypeInformation[T]] - asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo)) + asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo)) } /** diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala index fa503e0..8765cb3 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala @@ -40,7 +40,7 @@ class StreamExecutionEnvironmentTest { implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo() val env = StreamExecutionEnvironment.getExecutionEnvironment - val stream = env.continuousSource( + val stream = env.fromSource( new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks(), "test source")
