HuangXingBo commented on a change in pull request #13292:
URL: https://github.com/apache/flink/pull/13292#discussion_r481923107
##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(),
python_exec)
+ def add_jars(self, jars_path: str):
+ """
+ Adds a list of jar files that contain the user-defined function (UDF)
classes and all
Review comment:
I think `add_jars` is not only used in udf situations, maybe you need to
change the annotation
##########
File path:
flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
##########
@@ -421,6 +424,51 @@ def check_python_exec(i):
expected.sort()
self.assertEqual(expected, result)
+ def test_add_jars(self):
+ # find kafka connector jars
+ flink_source_root = _find_flink_source_root()
+ jars_abs_path = flink_source_root +
'/flink-connectors/flink-sql-connector-kafka'
+ specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar')
+ specific_jars = ['file://' + specific_jar for specific_jar in
specific_jars]
+ specific_jars = ';'.join(specific_jars)
+
+ self.env.add_jars(specific_jars)
+ source_topic = 'test_source_topic'
+ props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ type_info = Types.ROW([Types.INT(), Types.STRING()])
+
+ # Test for kafka consumer
+ deserialization_schema = JsonRowDeserializationSchema.builder() \
+ .type_info(type_info=type_info).build()
+
+ # Will get a ClassNotFoundException if not add the kafka connector
into the pipeline jars.
+ kafka_consumer = FlinkKafkaConsumer(source_topic,
deserialization_schema, props)
+ self.env.add_source(kafka_consumer).print()
+ self.env.get_execution_plan()
+
+ def test_add_classpaths(self):
+ # find kafka connector jars
+ flink_source_root = _find_flink_source_root()
+ jars_abs_path = flink_source_root +
'/flink-connectors/flink-sql-connector-kafka'
+ specific_jars = glob.glob(jars_abs_path + '/target/flink*.jar')
+ specific_jars = ['file://' + specific_jar for specific_jar in
specific_jars]
+ specific_jars = ';'.join(specific_jars)
+
+ self.env.add_classpaths(specific_jars)
+ source_topic = 'test_source_topic'
+ props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'test_group'}
+ type_info = Types.ROW([Types.INT(), Types.STRING()])
+
+ # Test for kafka consumer
+ deserialization_schema = JsonRowDeserializationSchema.builder() \
+ .type_info(type_info=type_info).build()
+
+ # Will get a ClassNotFoundException if not add the kafka connector
into the pipeline
Review comment:
```suggestion
# It will raise a ClassNotFoundException if the kafka connector is
not added into the pipeline
```
##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -553,6 +553,34 @@ def set_python_executable(self, python_exec: str):
.getEnvironmentConfig(self._j_stream_execution_environment)
env_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(),
python_exec)
+ def add_jars(self, jars_path: str):
+ """
+ Adds a list of jar files that contain the user-defined function (UDF)
classes and all
+ classes used from within the UDFs.
+
+ :param jars_path: Path of jars that delimited by ';'.
+ """
+ jvm = get_gateway().jvm
+ jars_key =
jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+ add_jars_to_context_class_loader(jars_path.split(";"))
+ env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
+ .getEnvironmentConfig(self._j_stream_execution_environment)
+ env_config.setString(jars_key, jars_path)
+
+ def add_classpaths(self, classpaths: str):
+ """
+ Adds a list of URLs that are added to the classpath of each user code
classloader of the
+ program. Paths must specify a protocol (e.g. file://) and be
accessible on all nodes
Review comment:
```suggestion
program. Paths must specify a protocol (e.g. file://) and be
accessible by all nodes
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]