Mika Naylor created FLINK-38418:
-----------------------------------
Summary: Collecting TIMESTAMP_LTZ values in PyFlink fails with
pickling error
Key: FLINK-38418
URL: https://issues.apache.org/jira/browse/FLINK-38418
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Mika Naylor
It is possible to insert Python {{datetime}} values into a table with a
{{TIMESTAMP_LTZ}} column, but attempting to extract the values out again when
collecting {{TIMESTAMP_LTZ}} value fails with a pickling exception. For example:
{code:java}
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import col
settings = EnvironmentSettings.in_streaming_mode()
env = TableEnvironment.create(settings)
env.execute_sql(
"""
CREATE TABLE MyTable (
a TIMESTAMP_LTZ
) WITH ('connector' = 'datagen')
"""
)
result = env.from_path("MyTable").select(col("a")).limit(1).execute().collect()
for r in result:
print(r)
{code}
Produces:
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromRow.
: org.apache.flink.api.python.shaded.net.razorvine.pickle.PickleException:
couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number
of arguments {code}
This is because
[PythonBridgeUtils.getPickledBytesFromRow|https://github.com/confluentinc/flink/blob/release-1.19-confluent/flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java#L166]
does not contain a case for a LocalZonedTimestampType, so it falls into the
default clause at the end which just tries to pickle the value object. The
value object is an Instant which cant be pickled properly.
The {{java.sql.Timestamp}} class which we pickle {{TimestampType}} with does
have a method of constructing one from an Instant, so I think it could be
possible to fix this via
{{{}pickler.dumps(Timestamp.from((Instant) obj)){}}}, but this might also have
to modify the to/from sql type logic in the python {{LocalZonedTimestampType}}
since the pickling would return a datetime object and not a timestamp.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)