[
https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843572#comment-17843572
]
Wouter Zorgdrager commented on FLINK-35290:
-------------------------------------------
It seems this bug has already been reported here
https://issues.apache.org/jira/browse/FLINK-35180. Will close this issue.
> Wrong Instant type conversion TableAPI to Datastream in thread mode
> -------------------------------------------------------------------
>
> Key: FLINK-35290
> URL: https://issues.apache.org/jira/browse/FLINK-35290
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.18.1
> Reporter: Wouter Zorgdrager
> Priority: Major
>
> In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a
> Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm
> wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ`
> maps to a Python `datetime`. Can't the same be done for the DatastreamAPI?
> Nevertheless, if we switch from `process` to `thread` mode for execution, the
> `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a
> `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if
> I only use the DatastreamAPI and read `Types.Instant()` directly, the
> conversion in both `thread` and `process` mode seem to work just fine.
> Below a minimal example exposing the bug:
> ```
> EXECUTION_MODE = "thread" # or "process"
> config = Configuration()
> config.set_string("python.execution-mode", EXECUTION_MODE)
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set("parallelism.default", "1")
> t_env.get_config().set("python.fn-execution.bundle.size", "1")
> t_env.get_config().set("python.execution-mode", EXECUTION_MODE)
> def to_epoch_ms(row: Row):
> print(type(row[1]))
> return row[1].to_epoch_milli()
> t_env.to_data_stream(
> t_env.from_elements(
> [
> (1, datetime(year=2024, day=10, month=9, hour=9)),
> (2, datetime(year=2024, day=10, month=9, hour=12)),
> (3, datetime(year=2024, day=22, month=11, hour=12)),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
> ]
> ),
> )
> ).map(to_epoch_ms, output_type=Types.LONG()).print()
> env.execute()
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)