[ 
https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Geng updated FLINK-35273:
------------------------------
    Description: 
The issue is from 
https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it 
turns out that the output result does not show the expected result.
Here is my test codes:
{code:python}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, Configuration
from pyflink.table import DataTypes, StreamTableEnvironment
from datetime import datetime
import pytz


config = Configuration()
config.set_string("python.client.executable", 
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
config.set_string("python.executable", 
"/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
env = StreamExecutionEnvironment.get_execution_environment(config)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set_local_timezone("UTC")
# t_env.get_config().set_local_timezone("GMT-08:00")

input_table = t_env.from_elements(
    [
        (
            "elementA",
            datetime(year=2024, month=4, day=12, hour=8, minute=35),
        ),
        (
            "elementB",
            datetime(year=2024, month=4, day=12, hour=8, minute=35, 
tzinfo=pytz.utc),
            # datetime(year=2024, month=4, day=12, hour=8, minute=35, 
tzinfo=pytz.timezone('America/New_York')),
        ),
    ],
    DataTypes.ROW(
        [
            DataTypes.FIELD("name", DataTypes.STRING()),
            DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
        ]
    ),
)
input_table.execute().print()

# SQL
sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT 
TO_TIMESTAMP_LTZ(1712910900000, 3);")
t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH 
('connector'='print');")
t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
{code}
The output is: 
{code:java}
+----+--------------------------------+-------------------------+
| op |                           name |               timestamp |
+----+--------------------------------+-------------------------+
| +I |                       elementA | 2024-04-12 08:35:00.000 |
| +I |                       elementB | 2024-04-12 16:35:00.000 |
+----+--------------------------------+-------------------------+
2 rows in set
+I[2024-04-12T08:35:00Z]
{code}

In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow 
logic to convert python obj to sql type:

{code:python}
EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
...
def to_sql_type(self, dt):
        if dt is not None:
            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                       else time.mktime(dt.timetuple()))
            return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
{code}
It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not 
decided by the timezone set by `set_local_timezone`.


> PyFlink's LocalZonedTimestampType should respect timezone set by 
> set_local_timezone
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-35273
>                 URL: https://issues.apache.org/jira/browse/FLINK-35273
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Biao Geng
>            Priority: Major
>
> The issue is from 
> https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399
> When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it 
> turns out that the output result does not show the expected result.
> Here is my test codes:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, Configuration
> from pyflink.table import DataTypes, StreamTableEnvironment
> from datetime import datetime
> import pytz
> config = Configuration()
> config.set_string("python.client.executable", 
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> config.set_string("python.executable", 
> "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set_local_timezone("UTC")
> # t_env.get_config().set_local_timezone("GMT-08:00")
> input_table = t_env.from_elements(
>     [
>         (
>             "elementA",
>             datetime(year=2024, month=4, day=12, hour=8, minute=35),
>         ),
>         (
>             "elementB",
>             datetime(year=2024, month=4, day=12, hour=8, minute=35, 
> tzinfo=pytz.utc),
>             # datetime(year=2024, month=4, day=12, hour=8, minute=35, 
> tzinfo=pytz.timezone('America/New_York')),
>         ),
>     ],
>     DataTypes.ROW(
>         [
>             DataTypes.FIELD("name", DataTypes.STRING()),
>             DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
>         ]
>     ),
> )
> input_table.execute().print()
> # SQL
> sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT 
> TO_TIMESTAMP_LTZ(1712910900000, 3);")
> t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH 
> ('connector'='print');")
> t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;")
> {code}
> The output is: 
> {code:java}
> +----+--------------------------------+-------------------------+
> | op |                           name |               timestamp |
> +----+--------------------------------+-------------------------+
> | +I |                       elementA | 2024-04-12 08:35:00.000 |
> | +I |                       elementB | 2024-04-12 16:35:00.000 |
> +----+--------------------------------+-------------------------+
> 2 rows in set
> +I[2024-04-12T08:35:00Z]
> {code}
> In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use 
> follow logic to convert python obj to sql type:
> {code:python}
> EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
> ...
> def to_sql_type(self, dt):
>         if dt is not None:
>             seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
>                        else time.mktime(dt.timetuple()))
>             return int(seconds) * 10 ** 6 + dt.microsecond + 
> self.EPOCH_ORDINAL
> {code}
> It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not 
> decided by the timezone set by `set_local_timezone`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to