yunfengzhou-hub commented on code in PR #121:
URL: https://github.com/apache/flink-ml/pull/121#discussion_r909134520


##########
flink-ml-python/pyflink/examples/ml/__init__.py:
##########
@@ -15,3 +15,80 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+from py4j.java_gateway import JavaClass, get_java_class, JavaObject
+from pyflink.java_gateway import get_gateway
+from pyflink.util import java_utils
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+def add_jars_to_context_class_loader(jar_urls):
+    """
+    Add jars to Python gateway server for local compilation and local 
execution (i.e. minicluster).
+    There are many component in Flink which won't be added to classpath by 
default. e.g. Kafka
+    connector, JDBC connector, CSV format etc. This utility function can be 
used to hot load the
+    jars.
+
+    :param jar_urls: The list of jar urls.
+    """
+    gateway = get_gateway()
+    # validate and normalize
+    jar_urls = [gateway.jvm.java.net.URL(url) for url in jar_urls]
+    context_classloader = 
gateway.jvm.Thread.currentThread().getContextClassLoader()
+    existing_urls = []
+    class_loader_name = context_classloader.getClass().getName()
+    if class_loader_name == "java.net.URLClassLoader":
+        existing_urls = set([url.toString() for url in 
context_classloader.getURLs()])
+    if all([url.toString() in existing_urls for url in jar_urls]):
+        # if urls all existed, no need to create new class loader.
+        return
+
+    URLClassLoaderClass = load_java_class("java.net.URLClassLoader")
+    if is_instance_of(context_classloader, URLClassLoaderClass):
+        if class_loader_name == 
"org.apache.flink.runtime.execution.librarycache." \
+                                
"FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
+            ensureInner = 
context_classloader.getClass().getDeclaredMethod("ensureInner", None)
+            ensureInner.setAccessible(True)
+            context_classloader = ensureInner.invoke(context_classloader, None)
+
+        addURL = URLClassLoaderClass.getDeclaredMethod(
+            "addURL",
+            to_jarray(
+                gateway.jvm.Class,
+                [load_java_class("java.net.URL")]))
+        addURL.setAccessible(True)
+
+        for url in jar_urls:
+            addURL.invoke(context_classloader, 
to_jarray(get_gateway().jvm.Object, [url]))
+
+    else:
+        context_classloader = create_url_class_loader(jar_urls, 
context_classloader)
+        
gateway.jvm.Thread.currentThread().setContextClassLoader(context_classloader)
+
+
+def is_instance_of(java_object, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, str):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return 
gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_object)
+
+
+def create_url_class_loader(urls, parent_class_loader):
+    gateway = get_gateway()
+    url_class_loader = gateway.jvm.java.net.URLClassLoader(
+        to_jarray(gateway.jvm.java.net.URL, urls), parent_class_loader)
+    return url_class_loader
+
+
+java_utils.add_jars_to_context_class_loader = add_jars_to_context_class_loader

Review Comment:
   Yes. It is a walk-around to a bug related to FLINK-15635 and FLINK-28002. 
I'll add a TODO to avoid overwriting `pyflink.util.java_utils` after this bug 
is fixed and released.
   
   And according to offline discussion, I'll try to reuse the 
`add_jars_to_context_class_loader` method across `pyflink/ml/__init__.py` and 
`pyflink/examples/ml/__init__.py`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to