shuiqiangchen commented on a change in pull request #13095:
URL: https://github.com/apache/flink/pull/13095#discussion_r467686241
##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -447,6 +447,23 @@ def get_execution_environment():
.StreamExecutionEnvironment.getExecutionEnvironment()
return StreamExecutionEnvironment(j_stream_exection_environment)
+ def add_source(self, source_func: SourceFunction, source_name: str =
'Custom Source',
+ type_info: TypeInformation = None) -> 'DataStream':
+ """
+ Adds a data source to the streaming topology.
+
+ :param source_func: the user defined function.
+ :param source_name: name of the data source. Optional.
+ :param type_info: type of the returned stream. Optional.
+ :return: the data stream constructed.
+ """
+ j_type_info = type_info.get_java_type_info() if type_info is not None
else None
+ j_data_stream =
self._j_stream_execution_environment.addSource(source_func
Review comment:
Maybe we can do it by a single call for addSource(SourceFunction<OUT>
function, String sourceName, TypeInformation<OUT> typeInfo), since the
`StreamExecutionEnvironment.addSource(SourceFunction<OUT> function, String
sourceName)` will then call
`StreamExecutionEnvironment.addSource(SourceFunction<OUT> function, String
sourceName, TypeInformation<OUT> typeInfo)` by setting typeInfo to null.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]