HuangXingBo commented on a change in pull request #13292:
URL: https://github.com/apache/flink/pull/13292#discussion_r481923107



##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str):
             .getEnvironmentConfig(self._j_stream_execution_environment)
         env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), 
python_exec)
 
+    def add_jars(self, jars_path: str):
+        """
+        Adds a list of jar files that contain the user-defined function (UDF) 
classes and all

Review comment:
       I think `add_jars` is not only used in udf situations, maybe you need to 
change the annotation
   

##########
File path: 
flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
##########
@@ -421,6 +424,51 @@ def check_python_exec(i):
         expected.sort()
         self.assertEqual(expected, result)
 
+    def test_add_jars(self):
+        # find kafka connector jars
+        flink_source_root = _find_flink_source_root()
+        jars_abs_path = flink_source_root + 
'/flink-connectors/flink-sql-connector-kafka'
+        specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar')
+        specific_jars = ['file://' + specific_jar for specific_jar in 
specific_jars]
+        specific_jars = ';'.join(specific_jars)
+
+        self.env.add_jars(specific_jars)
+        source_topic = 'test_source_topic'
+        props = {'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+        type_info = Types.ROW([Types.INT(), Types.STRING()])
+
+        # Test for kafka consumer
+        deserialization_schema = JsonRowDeserializationSchema.builder() \
+            .type_info(type_info=type_info).build()
+
+        # Will get a ClassNotFoundException if not add the kafka connector 
into the pipeline jars.
+        kafka_consumer = FlinkKafkaConsumer(source_topic, 
deserialization_schema, props)
+        self.env.add_source(kafka_consumer).print()
+        self.env.get_execution_plan()
+
+    def test_add_classpaths(self):
+        # find kafka connector jars
+        flink_source_root = _find_flink_source_root()
+        jars_abs_path = flink_source_root + 
'/flink-connectors/flink-sql-connector-kafka'
+        specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar')
+        specific_jars = ['file://' + specific_jar for specific_jar in 
specific_jars]
+        specific_jars = ';'.join(specific_jars)
+
+        self.env.add_classpaths(specific_jars)
+        source_topic = 'test_source_topic'
+        props = {'bootstrap.servers': 'localhost:9092', 'group.id': 
'test_group'}
+        type_info = Types.ROW([Types.INT(), Types.STRING()])
+
+        # Test for kafka consumer
+        deserialization_schema = JsonRowDeserializationSchema.builder() \
+            .type_info(type_info=type_info).build()
+
+        # Will get a ClassNotFoundException if not add the kafka connector 
into the pipeline

Review comment:
       ```suggestion
           # It will raise a ClassNotFoundException if the kafka connector is 
not added  into the pipeline
   ```

##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str):
             .getEnvironmentConfig(self._j_stream_execution_environment)
         env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), 
python_exec)
 
+    def add_jars(self, jars_path: str):
+        """
+        Adds a list of jar files that contain the user-defined function (UDF) 
classes and all
+        classes used from within the UDFs.
+
+        :param jars_path: Path of jars that delimited by ';'.
+        """
+        jvm = get_gateway().jvm
+        jars_key = 
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+        add_jars_to_context_class_loader(jars_path.split(";"))
+        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+            .getEnvironmentConfig(self._j_stream_execution_environment)
+        env_config.setString(jars_key, jars_path)
+
+    def add_classpaths(self, classpaths: str):
+        """
+        Adds a list of URLs that are added to the classpath of each user code 
classloader of the
+        program. Paths must specify a protocol (e.g. file://) and be 
accessible on all nodes

Review comment:
       ```suggestion
           program. Paths must specify a protocol (e.g. file://) and be 
accessible by all nodes
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to