Mika Naylor created FLINK-38418:
-----------------------------------

             Summary: Collecting TIMESTAMP_LTZ values in PyFlink fails with 
pickling error
                 Key: FLINK-38418
                 URL: https://issues.apache.org/jira/browse/FLINK-38418
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Mika Naylor


It is possible to insert Python {{datetime}} values into a table with a 
{{TIMESTAMP_LTZ}} column, but attempting to extract the values out again when 
collecting {{TIMESTAMP_LTZ}} value fails with a pickling exception. For example:


{code:java}
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col

settings = EnvironmentSettings.in_streaming_mode()
env = TableEnvironment.create(settings)

env.execute_sql(
    """
CREATE TABLE MyTable (
    a TIMESTAMP_LTZ
) WITH ('connector' = 'datagen')
"""
)

result = env.from_path("MyTable").select(col("a")).limit(1).execute().collect()

for r in result:
    print(r)
 {code}
 

Produces:

 
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromRow.
: org.apache.flink.api.python.shaded.net.razorvine.pickle.PickleException: 
couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number 
of arguments {code}
This is because 
[PythonBridgeUtils.getPickledBytesFromRow|https://github.com/confluentinc/flink/blob/release-1.19-confluent/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java#L166]
 does not contain a case for a LocalZonedTimestampType, so it falls into the 
default clause at the end which just tries to pickle the value object. The 
value object is an Instant which cant be pickled properly.

The {{java.sql.Timestamp}} class which we pickle {{TimestampType}} with does 
have a method of constructing one from an Instant, so I think it could be 
possible to fix this via
{{{}pickler.dumps(Timestamp.from((Instant) obj)){}}}, but this might also have 
to modify the to/from sql type logic in the python {{LocalZonedTimestampType}} 
since the pickling would return a datetime object and not a timestamp.
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to