Mohammad Kamar created ZEPPELIN-5877:
----------------------------------------

             Summary: IllegalArgumentException when running pyflink interpreter 
with user jars
                 Key: ZEPPELIN-5877
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-5877
             Project: Zeppelin
          Issue Type: Bug
          Components: zeppelin-interpreter
    Affects Versions: 0.10.1
            Reporter: Mohammad Kamar
         Attachments: pyflink-test-jar.zip

Calling some Java constructors and methods from a user provided jar through 
"flink.execution.jars" causes "{*}java.lang.IllegalArgumentException: argument 
type mismatch"{*} when the call includes Enums or static Fields. 


an example paragraph: 
{code:java}
%flink.pyflink

from pyflink.java_gateway import get_gatewaygateway = get_gateway()
jvm = gateway.jvmmyInstance = jvm.org.example.MyClass()
param = jvm.org.example.ParameterContainer.staticField

#either one of these two lines will throw IllegalArgumentException, on the 
second run of the interpreter
myInstance.doStuff(param)
myInstance.printEnum(jvm.org.example.EnumParameter.CHOICE_1) {code}
causes 
{code:java}
java.lang.IllegalArgumentException: argument type mismatch
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750){code}
I attached a zip containing the class definitions, and it can be used to build 
the jar and reproduce


The error will not occur on the first run of the paragraph, but on all 
following runs given the same code. When the Flink interpreter is restarted, we 
get another one successful run before the error appears again.

We first encountered when using the pyflink Kafka connector :  the paragraph 
below will produce the same error.It requires 
[flink-connector-kafka_2.12|https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.13.2]
 in flink.execution.jars to run.

 
{code:java}
%flink.pyflink

from pyflink.datastream.connectors import FlinkKafkaProducer from 
pyflink.common.serialization import SimpleStringSchema

# this will call a Java Constructor with an enum Parameter: 
#https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.html#FlinkKafkaProducer-java.lang.String-org.apache.flink.api.common.serialization.SerializationSchema-java.util.Properties-org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner-org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic-int-
 producer = FlinkKafkaProducer("output-topic", SimpleStringSchema(), 
{"bootstrap.servers": "localhost:9092"})  {code}
 

I did some analysis and concluded  that this bug is the result of a combination 
of two factors: 
1- py4j [caches 
|https://github.com/py4j/py4j/blob/master/py4j-java/src/main/java/py4j/reflection/ReflectionEngine.java#L68]method
 and constructor invokers
2- zeppelin [creates a new 
classloader|https://github.com/apache/zeppelin/blob/0209d0d0d40e979bd284d89ec3eb6a90a34bf84a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L846]
 on every paragraph run.

When a paragraph is run, the user provided java classes are loaded when needed 
by the new class loader. When invoking a method or a constructor however, py4j 
will use the cached invokers with old class handles from the class loader of 
the very first run, so the callee and parameters might have an old class 
handle. Static fields and Enums however, are not cached, but a new "Object" is 
created and stored by py4j every time they are accessed. so an Enum parameter 
loaded by the new classloader, passed as parameter to a callee object loaded by 
an old classloder, will cause the argument mismatch error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to