[ https://issues.apache.org/jira/browse/FLINK-35273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Biao Geng updated FLINK-35273: ------------------------------ Description: The issue is from https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399 When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it turns out that the output result does not show the expected result. Here is my test codes: {code:python} from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Types, Configuration from pyflink.table import DataTypes, StreamTableEnvironment from datetime import datetime import pytz config = Configuration() config.set_string("python.client.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") config.set_string("python.executable", "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") env = StreamExecutionEnvironment.get_execution_environment(config) t_env = StreamTableEnvironment.create(env) t_env.get_config().set_local_timezone("UTC") # t_env.get_config().set_local_timezone("GMT-08:00") input_table = t_env.from_elements( [ ( "elementA", datetime(year=2024, month=4, day=12, hour=8, minute=35), ), ( "elementB", datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.utc), # datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.timezone('America/New_York')), ), ], DataTypes.ROW( [ DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), ] ), ) input_table.execute().print() # SQL sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT TO_TIMESTAMP_LTZ(1712910900000, 3);") t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH ('connector'='print');") t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;") {code} The output is: {code:java} +----+--------------------------------+-------------------------+ | op | name | timestamp | +----+--------------------------------+-------------------------+ | +I | elementA | 2024-04-12 08:35:00.000 | | +I | elementB | 2024-04-12 16:35:00.000 | +----+--------------------------------+-------------------------+ 2 rows in set +I[2024-04-12T08:35:00Z] {code} In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use follow logic to convert python obj to sql type: {code:python} EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6 ... def to_sql_type(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL {code} It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not decided by the timezone set by `set_local_timezone`. > PyFlink's LocalZonedTimestampType should respect timezone set by > set_local_timezone > ----------------------------------------------------------------------------------- > > Key: FLINK-35273 > URL: https://issues.apache.org/jira/browse/FLINK-35273 > Project: Flink > Issue Type: Bug > Components: API / Python > Reporter: Biao Geng > Priority: Major > > The issue is from > https://apache-flink.slack.com/archives/C065944F9M2/p1714134880878399 > When using TIMESTAMP_LTZ in PyFlink while setting a different time zone, it > turns out that the output result does not show the expected result. > Here is my test codes: > {code:python} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types, Configuration > from pyflink.table import DataTypes, StreamTableEnvironment > from datetime import datetime > import pytz > config = Configuration() > config.set_string("python.client.executable", > "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") > config.set_string("python.executable", > "/usr/local/Caskroom/miniconda/base/envs/myenv/bin/python") > env = StreamExecutionEnvironment.get_execution_environment(config) > t_env = StreamTableEnvironment.create(env) > t_env.get_config().set_local_timezone("UTC") > # t_env.get_config().set_local_timezone("GMT-08:00") > input_table = t_env.from_elements( > [ > ( > "elementA", > datetime(year=2024, month=4, day=12, hour=8, minute=35), > ), > ( > "elementB", > datetime(year=2024, month=4, day=12, hour=8, minute=35, > tzinfo=pytz.utc), > # datetime(year=2024, month=4, day=12, hour=8, minute=35, > tzinfo=pytz.timezone('America/New_York')), > ), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("name", DataTypes.STRING()), > DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), > ] > ), > ) > input_table.execute().print() > # SQL > sql_result = t_env.execute_sql("CREATE VIEW MyView1 AS SELECT > TO_TIMESTAMP_LTZ(1712910900000, 3);") > t_env.execute_sql("CREATE TABLE Sink (`t` TIMESTAMP_LTZ) WITH > ('connector'='print');") > t_env.execute_sql("INSERT INTO Sink SELECT * FROM MyView1;") > {code} > The output is: > {code:java} > +----+--------------------------------+-------------------------+ > | op | name | timestamp | > +----+--------------------------------+-------------------------+ > | +I | elementA | 2024-04-12 08:35:00.000 | > | +I | elementB | 2024-04-12 16:35:00.000 | > +----+--------------------------------+-------------------------+ > 2 rows in set > +I[2024-04-12T08:35:00Z] > {code} > In pyflink/tables/types.py, the `LocalZonedTimestampType` class will use > follow logic to convert python obj to sql type: > {code:python} > EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6 > ... > def to_sql_type(self, dt): > if dt is not None: > seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo > else time.mktime(dt.timetuple())) > return int(seconds) * 10 ** 6 + dt.microsecond + > self.EPOCH_ORDINAL > {code} > It shows that the EPOCH_ORDINAL is calculated when the PVM starts but is not > decided by the timezone set by `set_local_timezone`. -- This message was sent by Atlassian Jira (v8.20.10#820010)