shuiqiangchen commented on a change in pull request #13095:
URL: https://github.com/apache/flink/pull/13095#discussion_r467686241



##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -447,6 +447,23 @@ def get_execution_environment():
             .StreamExecutionEnvironment.getExecutionEnvironment()
         return StreamExecutionEnvironment(j_stream_exection_environment)
 
+    def add_source(self, source_func: SourceFunction, source_name: str = 
'Custom Source',
+                   type_info: TypeInformation = None) -> 'DataStream':
+        """
+        Adds a data source to the streaming topology.
+
+        :param source_func: the user defined function.
+        :param source_name: name of the data source. Optional.
+        :param type_info: type of the returned stream. Optional.
+        :return: the data stream constructed.
+        """
+        j_type_info = type_info.get_java_type_info() if type_info is not None 
else None
+        j_data_stream = 
self._j_stream_execution_environment.addSource(source_func

Review comment:
       Maybe we can do it by a single call for addSource(SourceFunction<OUT> 
function, String sourceName, TypeInformation<OUT> typeInfo), since the 
`StreamExecutionEnvironment.addSource(SourceFunction<OUT> function, String 
sourceName)` will then call 
`StreamExecutionEnvironment.addSource(SourceFunction<OUT> function, String 
sourceName, TypeInformation<OUT> typeInfo)` by setting typeInfo to be null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to