This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6a73ed1b59 [FLINK-35282][python] Upgrade Apache Beam >=2.54 (#25541)
6b6a73ed1b59 is described below

commit 6b6a73ed1b593e8ebd4fafd21a485882da3143a9
Author: xaniasd <xan...@gmail.com>
AuthorDate: Mon Jan 20 02:54:23 2025 +0100

    [FLINK-35282][python] Upgrade Apache Beam >=2.54 (#25541)
---
 docs/content.zh/docs/deployment/cli.md             |  2 +-
 docs/content.zh/docs/dev/table/sqlClient.md        |  2 +-
 docs/content/docs/deployment/cli.md                |  2 +-
 docs/content/docs/dev/table/sqlClient.md           |  2 +-
 .../shortcodes/generated/python_configuration.html |  2 +-
 .../apache/flink/client/cli/CliFrontendParser.java |  2 +-
 flink-python/README.md                             |  2 +-
 flink-python/dev/dev-requirements.txt              |  2 +-
 flink-python/pom.xml                               | 92 +++++++++++++++-------
 .../datastream/stream_execution_environment.py     |  2 +-
 .../fn_execution/beam/beam_operations_slow.py      |  4 +-
 .../fn_execution/beam/beam_worker_pool_service.py  |  2 +-
 flink-python/pyflink/table/table_config.py         |  2 +-
 .../pyflink/table/tests/test_pandas_conversion.py  | 13 +--
 flink-python/pyproject.toml                        |  2 +-
 flink-python/setup.py                              |  2 +-
 .../control/DefaultJobBundleFactory.java           | 20 ++---
 .../fnexecution/state/GrpcStateService.java        |  6 +-
 .../apache/beam/sdk/fn/server/ServerFactory.java   | 24 +++---
 .../io/grpc/internal/SharedResourceHolder.java     |  4 +-
 .../org/apache/flink/python/PythonOptions.java     |  2 +-
 .../metric/process/FlinkMetricContainer.java       |  2 +-
 .../org/apache/flink/python/util/ProtoUtils.java   |  2 +-
 .../beam/BeamDataStreamPythonFunctionRunner.java   |  4 +-
 .../python/beam/BeamPythonFunctionRunner.java      | 10 +--
 .../python/beam/state/BeamBagStateHandler.java     |  2 +-
 .../python/beam/state/BeamMapStateHandler.java     |  2 +-
 .../python/beam/state/BeamStateRequestHandler.java |  4 +-
 .../python/beam/BeamTablePythonFunctionRunner.java |  2 +-
 flink-python/src/main/resources/META-INF/NOTICE    | 73 +++++++++--------
 .../metric/process/FlinkMetricContainerTest.java   |  2 +-
 .../PassThroughPythonAggregateFunctionRunner.java  |  2 +-
 .../PassThroughPythonScalarFunctionRunner.java     |  2 +-
 .../PassThroughPythonTableFunctionRunner.java      |  2 +-
 ...ThroughStreamAggregatePythonFunctionRunner.java |  2 +-
 ...amGroupWindowAggregatePythonFunctionRunner.java |  2 +-
 ...ghStreamTableAggregatePythonFunctionRunner.java |  2 +-
 .../src/test/resources/cli/all-mode-help.out       |  2 +-
 .../src/test/resources/cli/embedded-mode-help.out  |  2 +-
 pom.xml                                            |  2 +-
 40 files changed, 172 insertions(+), 140 deletions(-)

diff --git a/docs/content.zh/docs/deployment/cli.md 
b/docs/content.zh/docs/deployment/cli.md
index 8d2f079af713..5b5fac6cd564 100644
--- a/docs/content.zh/docs/deployment/cli.md
+++ b/docs/content.zh/docs/deployment/cli.md
@@ -576,7 +576,7 @@ related options. Here's an overview of all the Python 
related options for the ac
             <td>
                 Specify the path of the python interpreter used to execute the 
python UDF worker
                 (e.g.: --pyExecutable /usr/local/bin/python3).
-                The python UDF worker depends on Python 3.8+, Apache Beam 
(version == 2.43.0),
+                The python UDF worker depends on Python 3.8+, Apache Beam 
(version >= 2.54.0, <= 2.61.0),
                 Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
                 Please ensure that the specified environment meets the above 
requirements.
             </td>
diff --git a/docs/content.zh/docs/dev/table/sqlClient.md 
b/docs/content.zh/docs/dev/table/sqlClient.md
index b51c44d603f7..67df4bba6ff5 100644
--- a/docs/content.zh/docs/dev/table/sqlClient.md
+++ b/docs/content.zh/docs/dev/table/sqlClient.md
@@ -322,7 +322,7 @@ Mode "embedded" (default) submits Flink jobs from the local 
machine.
                                                 /usr/local/bin/python3). The
                                                 python UDF worker depends on
                                                 Python 3.8+, Apache Beam
-                                                (version == 2.43.0), Pip
+                                                (version >= 2.54.0, <= 
2.61.0), Pip
                                                 (version >= 20.3) and 
SetupTools
                                                 (version >= 37.0.0). Please
                                                 ensure that the specified
diff --git a/docs/content/docs/deployment/cli.md 
b/docs/content/docs/deployment/cli.md
index 9250704ef417..d0b13c3ab59f 100644
--- a/docs/content/docs/deployment/cli.md
+++ b/docs/content/docs/deployment/cli.md
@@ -574,7 +574,7 @@ related options. Here's an overview of all the Python 
related options for the ac
             <td>
                 Specify the path of the python interpreter used to execute the 
python UDF worker
                 (e.g.: --pyExecutable /usr/local/bin/python3).
-                The python UDF worker depends on Python 3.8+, Apache Beam 
(version == 2.43.0),
+                The python UDF worker depends on Python 3.8+, Apache Beam 
(version >= 2.54.0,<= 2.61.0),
                 Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
                 Please ensure that the specified environment meets the above 
requirements.
             </td>
diff --git a/docs/content/docs/dev/table/sqlClient.md 
b/docs/content/docs/dev/table/sqlClient.md
index 7a228319aed1..3a868269f989 100644
--- a/docs/content/docs/dev/table/sqlClient.md
+++ b/docs/content/docs/dev/table/sqlClient.md
@@ -260,7 +260,7 @@ Mode "embedded" (default) submits Flink jobs from the local 
machine.
                                                 /usr/local/bin/python3). The
                                                 python UDF worker depends on
                                                 Python 3.8+, Apache Beam
-                                                (version == 2.43.0), Pip
+                                                (version >= 2.54.0, <= 
2.61.0), Pip
                                                 (version >= 20.3) and 
SetupTools
                                                 (version >= 37.0.0). Please
                                                 ensure that the specified
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index d99c1f2a3b95..257b3365b054 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>python.executable</h5></td>
             <td style="word-wrap: break-word;">"python"</td>
             <td>String</td>
-            <td>Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version == 2.43.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 
37.0.0). Please ensure that the specified environment meets the above 
requirements. The option is equivalent to the command line option 
"-pyexec".</td>
+            <td>Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version >= 2.54.0, <= 2.61.0), Pip (version &gt;= 20.3) and SetupTools 
(version &gt;= 37.0.0). Please ensure that the specified environment meets the 
above requirements. The option is equivalent to the command line option 
"-pyexec".</td>
         </tr>
         <tr>
             <td><h5>python.execution-mode</h5></td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 1124aeecded1..ac3aefe116f1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -275,7 +275,7 @@ public class CliFrontendParser {
                     true,
                     "Specify the path of the python interpreter used to 
execute the python UDF worker "
                             + "(e.g.: --pyExecutable /usr/local/bin/python3). "
-                            + "The python UDF worker depends on Python 3.8+, 
Apache Beam (version == 2.43.0), "
+                            + "The python UDF worker depends on Python 3.8+, 
Apache Beam (version >= 2.54.0, <= 2.61.0), "
                             + "Pip (version >= 20.3) and SetupTools (version 
>= 37.0.0). "
                             + "Please ensure that the specified environment 
meets the above requirements.");
 
diff --git a/flink-python/README.md b/flink-python/README.md
index f8daeefd212c..457f748d9560 100644
--- a/flink-python/README.md
+++ b/flink-python/README.md
@@ -26,7 +26,7 @@ The auto-generated Python docs can be found at 
[https://nightlies.apache.org/fli
 
 ## Python Requirements
 
-Apache Flink Python API depends on Py4J (currently version 0.10.9.7), 
CloudPickle (currently version 2.2.0), python-dateutil (currently version 
>=2.8.0,<3), Apache Beam (currently version >=2.43.0,<2.49.0).
+Apache Flink Python API depends on Py4J (currently version 0.10.9.7), 
CloudPickle (currently version 2.2.0), python-dateutil (currently version 
>=2.8.0,<3), Apache Beam (currently version >= 2.54.0, <= 2.61.0).
 
 ## Development Notices
 
diff --git a/flink-python/dev/dev-requirements.txt 
b/flink-python/dev/dev-requirements.txt
index 0e7d3fadb0ce..63649ab7e98d 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -15,7 +15,7 @@
 pip>=20.3
 setuptools>=18.0
 wheel
-apache-beam>=2.43.0,<2.49.0
+apache-beam>=2.54.0,<=2.61.0
 cython>=0.29.24
 py4j==0.10.9.7
 python-dateutil>=2.8.0,<3
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 3ac65f7cc4e3..16d9095c4e12 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -17,8 +17,9 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
 
        <modelVersion>4.0.0</modelVersion>
 
@@ -454,7 +455,8 @@ under the License.
                                                                <delete 
dir="${project.basedir}/build"/>
                                                                <delete 
dir="${project.basedir}/apache-flink-libraries/build"/>
                                                                <delete 
dir="${project.basedir}/apache_flink.egg-info"/>
-                                                               <delete 
dir="${project.basedir}/apache-flink-libraries/apache_flink_libraries.egg-info"/>
+                                                               <delete
+                                                                       
dir="${project.basedir}/apache-flink-libraries/apache_flink_libraries.egg-info"/>
                                                        </target>
                                                </configuration>
                                        </execution>
@@ -504,15 +506,17 @@ under the License.
                                                                         
basedir="${project.build.directory}/test-classes"
                                                                         
includes="**/PythonFunctionFactoryTest.class"/>
 
-                                                               <jar 
destfile="${project.build.directory}/artifacts/testDataStream.jar"
-                                                                        
basedir="${project.build.directory}/test-classes"
-                                                                        
includes="**/DataStreamTestCollectSink.class,**/MyCustomSourceFunction.class,**/PartitionCustomTestMapFunction.class"/>
+                                                               <jar
+                                                                       
destfile="${project.build.directory}/artifacts/testDataStream.jar"
+                                                                       
basedir="${project.build.directory}/test-classes"
+                                                                       
includes="**/DataStreamTestCollectSink.class,**/MyCustomSourceFunction.class,**/PartitionCustomTestMapFunction.class"/>
 
                                                                <jar 
destfile="${project.build.directory}/artifacts/dummy.jar"
                                                                         
basedir="${project.build.directory}/test-classes"
                                                                         
includes="**/TestJob.class">
                                                                        
<manifest>
-                                                                               
<attribute name="Main-Class" value="org.apache.flink.client.cli.TestJob" />
+                                                                               
<attribute name="Main-Class"
+                                                                               
                   value="org.apache.flink.client.cli.TestJob"/>
                                                                        
</manifest>
                                                                </jar>
                                                        </target>
@@ -581,7 +585,8 @@ under the License.
                                                                </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
-                                                                       
<artifactId>flink-sql-connector-aws-kinesis-firehose</artifactId>
+                                                                       
<artifactId>flink-sql-connector-aws-kinesis-firehose
+                                                                       
</artifactId>
                                                                </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
@@ -593,7 +598,8 @@ under the License.
                                                                </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
-                                                                       
<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
+                                                                       
<artifactId>flink-connector-cassandra_${scala.binary.version}
+                                                                       
</artifactId>
                                                                </artifactItem>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
@@ -618,7 +624,8 @@ under the License.
                                                                        
<artifactId>flink-test-utils-junit</artifactId>
                                                                </artifactItem>
                                                        </artifactItems>
-                                                       
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                                                       
<outputDirectory>${project.build.directory}/test-dependencies
+                                                       </outputDirectory>
                                                </configuration>
                                        </execution>
                                </executions>
@@ -635,7 +642,8 @@ under the License.
                                                </goals>
                                                <configuration>
                                                        
<includeGroupIds>junit</includeGroupIds>
-                                                       
<outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                                                       
<outputDirectory>${project.build.directory}/test-dependencies
+                                                       </outputDirectory>
                                                </configuration>
                                        </execution>
                                </executions>
@@ -669,17 +677,25 @@ under the License.
                                                                <filter>
                                                                        
<artifact>org.apache.beam:beam-sdks-java-core</artifact>
                                                                        
<excludes>
-                                                                               
<exclude>org/apache/beam/repackaged/core/org/antlr/**</exclude>
-                                                                               
<exclude>org/apache/beam/repackaged/core/org/apache/commons/compress/**</exclude>
-                                                                               
<exclude>org/apache/beam/repackaged/core/org/apache/commons/lang3/**</exclude>
+                                                                               
<exclude>org/apache/beam/repackaged/core/org/antlr/**
+                                                                               
</exclude>
+                                                                               
<exclude>
+                                                                               
        org/apache/beam/repackaged/core/org/apache/commons/compress/**
+                                                                               
</exclude>
+                                                                               
<exclude>
+                                                                               
        org/apache/beam/repackaged/core/org/apache/commons/lang3/**
+                                                                               
</exclude>
                                                                        
</excludes>
                                                                </filter>
                                                                <filter>
                                                                        
<artifact>org.apache.beam:beam-vendor-grpc-1_43_2</artifact>
                                                                        
<excludes>
-                                                                               
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/jboss/**</exclude>
+                                                                               
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/jboss/**
+                                                                               
</exclude>
                                                                                
<exclude>schema/**</exclude>
-                                                                               
<exclude>org/apache/beam/vendor/grpc/v1p43p2/org/eclipse/jetty/**</exclude>
+                                                                               
<exclude>
+                                                                               
        org/apache/beam/vendor/grpc/v1p43p2/org/eclipse/jetty/**
+                                                                               
</exclude>
                                                                        
</excludes>
                                                                </filter>
                                                                <filter>
@@ -687,6 +703,7 @@ under the License.
                                                                        
<excludes>
                                                                                
<exclude>LICENSE-junit.txt</exclude>
                                                                                
<exclude>LICENSE.txt</exclude>
+                                                                               
<exclude>LICENSE</exclude>
                                                                                
<exclude>META-INF/LICENSE.txt</exclude>
                                                                                
<exclude>*.proto</exclude>
                                                                        
</excludes>
@@ -695,43 +712,60 @@ under the License.
                                                        <relocations 
combine.children="append">
                                                                <relocation>
                                                                        
<pattern>py4j</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.py4j</shadedPattern>
+                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.py4j
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>net.razorvine</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.net.razorvine</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.net.razorvine
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>com.fasterxml.jackson</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.com.fasterxml.jackson</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.com.fasterxml.jackson
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>org.joda.time</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.org.joda.time</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.org.joda.time
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>com.google.protobuf</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.com.google.protobuf</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.com.google.protobuf
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>org.apache.arrow</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.org.apache.arrow</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.org.apache.arrow
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>io.netty</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.io.netty</shadedPattern>
+                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.io.netty
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>com.google.flatbuffers</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.com.google.flatbuffers</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.com.google.flatbuffers
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>com.google.protobuf</pattern>
-                                                                       
<shadedPattern>org.apache.flink.api.python.shaded.com.google.protobuf</shadedPattern>
+                                                                       
<shadedPattern>
+                                                                               
org.apache.flink.api.python.shaded.com.google.protobuf
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                                <relocation>
                                                                        
<pattern>org.apache.avro</pattern>
-                                                                       
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
+                                                                       
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro
+                                                                       
</shadedPattern>
                                                                </relocation>
                                                        </relocations>
                                                </configuration>
@@ -749,7 +783,8 @@ under the License.
                                                        <goal>run</goal>
                                                </goals>
                                                <configuration>
-                                                       
<protocArtifact>com.google.protobuf:protoc:${protoc.version}</protocArtifact>
+                                                       
<protocArtifact>com.google.protobuf:protoc:${protoc.version}
+                                                       </protocArtifact>
                                                        <inputDirectories>
                                                                
<include>pyflink/proto</include>
                                                        </inputDirectories>
@@ -764,7 +799,8 @@ under the License.
                                        <systemPropertyVariables>
                                                <!-- Arrow requires the 
property io.netty.tryReflectionSetAccessible to
                                                be set to true for JDK >= 9. 
Please refer to ARROW-5412 for more details. -->
-                                               
<io.netty.tryReflectionSetAccessible>true</io.netty.tryReflectionSetAccessible>
+                                               
<io.netty.tryReflectionSetAccessible>true
+                                               
</io.netty.tryReflectionSetAccessible>
                                        </systemPropertyVariables>
                                </configuration>
                        </plugin>
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 00e639ec3797..98f68cef9a71 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -545,7 +545,7 @@ class StreamExecutionEnvironment(object):
 
         .. note::
 
-            The python udf worker depends on Apache Beam (version == 2.43.0).
+            The python udf worker depends on Apache Beam (version >= 2.54.0, 
<= 2.61.0).
             Please ensure that the specified environment meets the above 
requirements.
 
         :param python_exec: The path of python interpreter.
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py 
b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index 603ba4c3b55e..a43765d76098 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -95,8 +95,8 @@ class FunctionOperation(Operation):
         if not self._has_side_output:
             self._main_output_processor = 
self._output_processors[DEFAULT_OUTPUT_TAG][0]
 
-    def setup(self):
-        super(FunctionOperation, self).setup()
+    def setup(self, data_sampler=None):
+        super().setup(data_sampler)
 
     def start(self):
         with self.scoped_start_state:
diff --git a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py 
b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
index 24935383c574..ea23d3700430 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
@@ -156,7 +156,7 @@ class 
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorker
                 control_address=control_service_descriptor.url,
                 status_address=status_service_descriptor.url,
                 worker_id=_worker_id,
-                
state_cache_size=sdk_worker_main._get_state_cache_size(experiments),
+                
state_cache_size=sdk_worker_main._get_state_cache_size_bytes(sdk_pipeline_options),
                 
data_buffer_time_limit_ms=sdk_worker_main._get_data_buffer_time_limit_ms(
                     experiments),
                 profiler_factory=profiler.Profile.factory_from_options(
diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index ac5eba30633c..9c80e59a49a7 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -258,7 +258,7 @@ class TableConfig(object):
 
         .. note::
 
-            The python udf worker depends on Apache Beam (version == 2.43.0).
+            The python udf worker depends on Apache Beam (version >= 2.54.0, 
<= 2.61.0).
             Please ensure that the specified environment meets the above 
requirements.
 
         :param python_exec: The path of python interpreter.
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py 
b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 703fb9b512f5..9cc0f8ccdf67 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -31,7 +31,7 @@ class PandasConversionTestBase(object):
 
     @classmethod
     def setUpClass(cls):
-        super(PandasConversionTestBase, cls).setUpClass()
+        super().setUpClass()
         cls.data = [(1, 1, 1, 1, True, 1.1, 1.2, 'hello', bytearray(b"aaa"),
                      decimal.Decimal('1000000000000000000.01'), 
datetime.date(2014, 9, 13),
                      datetime.time(hour=1, minute=0, second=1),
@@ -236,12 +236,5 @@ class StreamPandasConversionTests(PandasConversionITTests,
         result_pdf = t.to_pandas()
         import pandas as pd
         os.remove(source_path)
-        assert_frame_equal(result_pdf, pd.DataFrame(
-            data={"rowtime": [
-                datetime.datetime(2018, 3, 11, 3, 10),
-                datetime.datetime(2018, 3, 11, 3, 10),
-                datetime.datetime(2018, 3, 11, 3, 10),
-                datetime.datetime(2018, 3, 11, 3, 40),
-                datetime.datetime(2018, 3, 11, 4, 20),
-                datetime.datetime(2018, 3, 11, 3, 30),
-            ]}))
+        expected_df = pd.DataFrame(data={"rowtime": pd.Series(data, 
dtype="datetime64[ms]")})
+        assert_frame_equal(result_pdf, expected_df)
diff --git a/flink-python/pyproject.toml b/flink-python/pyproject.toml
index 0ec9012fbc8d..248a86017ece 100644
--- a/flink-python/pyproject.toml
+++ b/flink-python/pyproject.toml
@@ -21,7 +21,7 @@ requires = [
     "packaging>=20.5; platform_machine=='arm64'",  # macos M1
     "setuptools>=18.0",
     "wheel",
-    "apache-beam>=2.43.0,<2.49.0",
+    "apache-beam>=2.54.0,<=2.61.0",
     "cython>=0.29.24",
     "fastavro>=1.1.0,!=1.8.0"
 ]
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 3b03f41c7c77..119f6ba4585d 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -317,7 +317,7 @@ try:
         'pyflink.bin': ['*']}
 
     install_requires = ['py4j==0.10.9.7', 'python-dateutil>=2.8.0,<3',
-                        'apache-beam>=2.43.0,<2.49.0',
+                        'apache-beam>=2.54.0,<=2.61.0',
                         'cloudpickle>=2.2.0', 'avro-python3>=1.8.1,!=1.9.2',
                         'pytz>=2018.3', 'fastavro>=1.1.0,!=1.8.0', 
'requests>=2.26.0',
                         'protobuf>=3.19.0',
diff --git 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 8ac786f0c2e0..25523f02944f 100644
--- 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -56,16 +56,16 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.util.NoopLock;
 import org.apache.beam.sdk.values.KV;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index 1eb1d472e3bc..0cac066567c1 100644
--- 
a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ 
b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -21,8 +21,8 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
 import org.apache.beam.sdk.fn.server.FnService;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCallStreamObserver;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -30,7 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables.getStackTraceAsString;
 
 /** An implementation of the Beam Fn State service. */
 public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
diff --git 
a/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java 
b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
index 2d75c653120d..57fcfbcdaf49 100644
--- 
a/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
+++ 
b/flink-python/src/main/java/org/apache/beam/sdk/fn/server/ServerFactory.java
@@ -19,17 +19,17 @@ package org.apache.beam.sdk.fn.server;
 
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.BindableService;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ServerInterceptors;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.netty.NettyServerBuilder;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollEventLoopGroup;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollServerSocketChannel;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.unix.DomainSocketAddress;
-import 
org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.internal.ThreadLocalRandom;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.BindableService;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ServerBuilder;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ServerInterceptors;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.netty.NettyServerBuilder;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.netty.channel.epoll.EpollEventLoopGroup;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.netty.channel.epoll.EpollServerSocketChannel;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.netty.channel.unix.DomainSocketAddress;
+import 
org.apache.beam.vendor.grpc.v1p60p1.io.netty.util.internal.ThreadLocalRandom;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 // This class is copied from Beam's 
org.apache.beam.sdk.fn.server.ServerFactory,
 // can be removed after https://github.com/apache/beam/issues/21598 is fixed.
diff --git 
a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p48p1/io/grpc/internal/SharedResourceHolder.java
 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p60p1/io/grpc/internal/SharedResourceHolder.java
similarity index 98%
rename from 
flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p48p1/io/grpc/internal/SharedResourceHolder.java
rename to 
flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p60p1/io/grpc/internal/SharedResourceHolder.java
index ab08ce1f850c..dab28cac5edf 100644
--- 
a/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p48p1/io/grpc/internal/SharedResourceHolder.java
+++ 
b/flink-python/src/main/java/org/apache/beam/vendor/grpc/v1p60p1/io/grpc/internal/SharedResourceHolder.java
@@ -14,9 +14,9 @@
  * limitations under the License.
  */
 
-package org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal;
+package org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal;
 
-import 
org.apache.beam.vendor.grpc.v1p48p1.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions;
 
 import javax.annotation.concurrent.ThreadSafe;
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 142457bd19cc..7aab21e27fbf 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -160,7 +160,7 @@ public class PythonOptions {
                     .withDescription(
                             "Specify the path of the python interpreter used 
to execute the python "
                                     + "UDF worker. The python UDF worker 
depends on Python 3.8+, Apache Beam "
-                                    + "(version == 2.43.0), Pip (version >= 
20.3) and SetupTools (version >= 37.0.0). "
+                                    + "(version >= 2.54.0, <= 2.61.0), Pip 
(version >= 20.3) and SetupTools (version >= 37.0.0). "
                                     + "Please ensure that the specified 
environment meets the above requirements. The "
                                     + "option is equivalent to the command 
line option \"-pyexec\".");
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
 
b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
index 4759fc542216..32f286b42659 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/metric/process/FlinkMetricContainer.java
@@ -43,7 +43,7 @@ import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java 
b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
index 29691f445bb7..a752c69c011e 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java
@@ -58,7 +58,7 @@ public enum ProtoUtils {
                         RunnerApi.FunctionSpec.newBuilder()
                                 .setUrn(FLINK_CODER_URN)
                                 .setPayload(
-                                        
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf
+                                        
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf
                                                 .ByteString.copyFrom(
                                                 
coderInfoDescriptor.toByteArray()))
                                 .build())
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
index 823e91ccfc58..23cc2a53eb49 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.java
@@ -182,7 +182,7 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
                                 RunnerApi.FunctionSpec.newBuilder()
                                         .setUrn(STATELESS_FUNCTION_URN)
                                         .setPayload(
-                                                
org.apache.beam.vendor.grpc.v1p48p1.com.google
+                                                
org.apache.beam.vendor.grpc.v1p60p1.com.google
                                                         
.protobuf.ByteString.copyFrom(
                                                         proto.toByteArray()))
                                         .build());
@@ -199,7 +199,7 @@ public class BeamDataStreamPythonFunctionRunner extends 
BeamPythonFunctionRunner
                                 RunnerApi.FunctionSpec.newBuilder()
                                         .setUrn(urn)
                                         .setPayload(
-                                                
org.apache.beam.vendor.grpc.v1p48p1.com.google
+                                                
org.apache.beam.vendor.grpc.v1p60p1.com.google
                                                         
.protobuf.ByteString.copyFrom(
                                                         proto.toByteArray()))
                                         .build());
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 4e52378f7a69..0548be14c4cf 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -75,8 +75,8 @@ import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -586,7 +586,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                 RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
                         
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
                         .setPayload(
-                                
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString
+                                
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString
                                         .copyFrom(baos.toByteArray()))
                         .setInputOrOutputId(INPUT_COLLECTION_ID)
                         .build());
@@ -594,7 +594,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                 RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
                         
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
                         .setPayload(
-                                
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString
+                                
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString
                                         .copyFrom(baos.toByteArray()))
                         .setInputOrOutputId(OUTPUT_COLLECTION_ID)
                         .build());
@@ -604,7 +604,7 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
                     
RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder()
                             
.setUrn(getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE))
                             .setPayload(
-                                    
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf
+                                    
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf
                                             
.ByteString.copyFrom(baos.toByteArray()))
                             .setInputOrOutputId(entry.getKey())
                             .build());
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamBagStateHandler.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamBagStateHandler.java
index 96c6089651a3..12e23eca2439 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamBagStateHandler.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamBagStateHandler.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 
 import javax.annotation.Nullable;
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamMapStateHandler.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamMapStateHandler.java
index 8b62d656c26f..0b181f6bf411 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamMapStateHandler.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamMapStateHandler.java
@@ -28,7 +28,7 @@ import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
 
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 
 import java.util.HashMap;
 import java.util.Iterator;
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamStateRequestHandler.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamStateRequestHandler.java
index edcdb921073a..3cb259c2eec5 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamStateRequestHandler.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/state/BeamStateRequestHandler.java
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
 
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.common.base.Charsets;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Charsets;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 
 import javax.annotation.Nullable;
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 50a2e16111cf..3df85ee3a7ef 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -95,7 +95,7 @@ public class BeamTablePythonFunctionRunner extends 
BeamPythonFunctionRunner {
                                 RunnerApi.FunctionSpec.newBuilder()
                                         .setUrn(functionUrn)
                                         .setPayload(
-                                                
org.apache.beam.vendor.grpc.v1p48p1.com.google
+                                                
org.apache.beam.vendor.grpc.v1p60p1.com.google
                                                         
.protobuf.ByteString.copyFrom(
                                                         
userDefinedFunctionProto.toByteArray()))
                                         .build())
diff --git a/flink-python/src/main/resources/META-INF/NOTICE 
b/flink-python/src/main/resources/META-INF/NOTICE
index a10cdce6152d..a3449323b149 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -11,23 +11,23 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.fasterxml.jackson.core:jackson-databind:2.18.2
 - com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.2
 - com.google.flatbuffers:flatbuffers-java:1.12.0
-- io.netty:netty-buffer:4.1.100.Final
-- io.netty:netty-common:4.1.100.Final
 - joda-time:joda-time:2.5
 - org.apache.arrow:arrow-format:13.0.0
 - org.apache.arrow:arrow-memory-core:13.0.0
 - org.apache.arrow:arrow-memory-netty:13.0.0
 - org.apache.arrow:arrow-vector:13.0.0
-- org.apache.beam:beam-model-fn-execution:2.43.0
-- org.apache.beam:beam-model-job-management:2.43.0
-- org.apache.beam:beam-model-pipeline:2.43.0
-- org.apache.beam:beam-runners-core-construction-java:2.43.0
-- org.apache.beam:beam-runners-core-java:2.43.0
-- org.apache.beam:beam-runners-java-fn-execution:2.43.0
-- org.apache.beam:beam-sdks-java-core:2.43.0
-- org.apache.beam:beam-sdks-java-fn-execution:2.43.0
-- org.apache.beam:beam-vendor-guava-26_0-jre:0.1
-- org.apache.beam:beam-vendor-grpc-1_48_1:0.1
+- org.apache.beam:beam-model-fn-execution:2.54.0
+- org.apache.beam:beam-model-job-management:2.54.0
+- org.apache.beam:beam-model-pipeline:2.54.0
+- org.apache.beam:beam-runners-core-construction-java:2.54.0
+- org.apache.beam:beam-runners-core-java:2.54.0
+- org.apache.beam:beam-runners-java-fn-execution:2.54.0
+- org.apache.beam:beam-sdks-java-core:2.54.0
+- org.apache.beam:beam-sdks-java-fn-execution:2.54.0
+- org.apache.beam:beam-sdks-java-extensions-avro:2.54.0
+- org.apache.beam:beam-sdks-java-transform-service-launcher:2.54.0
+- org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1
+- org.apache.beam:beam-vendor-grpc-1_60_1:0.1
 - com.alibaba:pemja:0.4.1
 
 This project bundles the following dependencies under the BSD license.
@@ -40,39 +40,42 @@ This project bundles the following dependencies under the 
MIT license. (https://
 See bundled license files for details.
 
 - net.razorvine:pyrolite:4.13
+- org.checkerframework:checker-qual:3.42.0
+- io.github.classgraph:classgraph:4.8.162
+- org.slf4j:slf4j-api:1.7.36
+- args4j:args4j:2.33
 
 The bundled Apache Beam dependencies bundle the following dependencies under 
the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
 - com.google.api.grpc:proto-google-common-protos:2.9.0
 - com.google.code.gson:gson:2.9.0
-- com.google.guava:guava:31.1-jre
-- io.grpc:grpc-auth:1.48.1
-- io.grpc:grpc-core:1.48.1
-- io.grpc:grpc-context:1.48.1
-- io.grpc:grpc-netty:1.48.1
-- io.grpc:grpc-protobuf:1.48.1
-- io.grpc:grpc-stub:1.48.1
-- io.grpc:grpc-testing:1.48.1
-- io.netty:netty-buffer:4.1.77.Final
-- io.netty:netty-codec:4.1.77.Final
-- io.netty:netty-codec-http:4.1.77.Final
-- io.netty:netty-codec-http2:4.1.77.Final
-- io.netty:netty-codec-socks:4.1.77.Final
-- io.netty:netty-common:4.1.77.Final
-- io.netty:netty-handler:4.1.77.Final
-- io.netty:netty-handler-proxy:4.1.77.Final
-- io.netty:netty-resolver:4.1.77.Final
-- io.netty:netty-transport:4.1.77.Final
-- io.netty:netty-transport-native-epoll:4.1.77.Final:linux-x86_64
-- io.netty:netty-transport-native-unix-common:4.1.77.Final
-- io.netty:netty-tcnative-boringssl-static:2.0.53.Final
+- com.google.guava:guava:32.1.2-jre
+- io.grpc:grpc-auth:1.59.1
+- io.grpc:grpc-core:1.59.1
+- io.grpc:grpc-context:1.59.1
+- io.grpc:grpc-netty:1.59.1
+- io.grpc:grpc-protobuf:1.59.1
+- io.grpc:grpc-stub:1.59.1
+- io.grpc:grpc-testing:1.59.1
+- io.netty:netty-buffer:4.1.100.Final
+- io.netty:netty-codec:4.1.100.Final
+- io.netty:netty-codec-http:4.1.100.Final
+- io.netty:netty-codec-http2:4.1.100.Final
+- io.netty:netty-codec-socks:4.1.100.Final
+- io.netty:netty-common:4.1.100.Final
+- io.netty:netty-handler:4.1.100.Final
+- io.netty:netty-handler-proxy:4.1.100.Final
+- io.netty:netty-resolver:4.1.100.Final
+- io.netty:netty-transport:4.1.100.Final
+- io.netty:netty-transport-native-epoll:4.1.100.Final:linux-x86_64
+- io.netty:netty-transport-native-unix-common:4.1.100.Final
 - io.opencensus:opencensus-api:0.31.0
 - io.opencensus:opencensus-contrib-grpc-metrics:0.31.0
-- io.perfmark:perfmark-api:0.25.0
+- io.perfmark:perfmark-api:0.26.0
+- com.google.auto.value:auto-value-annotations:1.8.2
 
 The bundled Apache Beam dependencies bundle the following dependencies under 
the BSD license.
 See bundled license files for details
 
 - com.google.auth:google-auth-library-credentials:1.4.0
-- com.google.protobuf:protobuf-java:3.21.1
 - com.google.protobuf:protobuf-java-util:3.21.1
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
index 16595fd8ac0d..33f98353c88d 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/metric/process/FlinkMetricContainerTest.java
@@ -35,7 +35,7 @@ import 
org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index af8c70a64e09..f6ef62eaa514 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 721fb9f2e4cb..30fd500705f7 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import java.util.LinkedList;
 import java.util.List;
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index 0c8f41a64231..56e93ce7de05 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import java.util.LinkedList;
 import java.util.List;
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
index 6455cc52ef4c..091704616bdb 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamAggregatePythonFunctionRunner.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import java.util.LinkedList;
 import java.util.List;
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
index b41676141d7e..35b1016e5fd4 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamGroupWindowAggregatePythonFunctionRunner.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import static 
org.apache.flink.python.util.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto;
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
index d6e2986cd3d5..51540fb26a68 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughStreamTableAggregatePythonFunctionRunner.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctio
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
 
 import java.util.Arrays;
 import java.util.LinkedList;
diff --git 
a/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out 
b/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
index ccb8d66e9048..af61e25aedd5 100644
--- a/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
+++ b/flink-table/flink-sql-client/src/test/resources/cli/all-mode-help.out
@@ -85,7 +85,7 @@ Mode "embedded" (default) submits Flink jobs from the local 
machine.
                                                 /usr/local/bin/python3). The
                                                 python UDF worker depends on
                                                 Python 3.8+, Apache Beam
-                                                (version == 2.43.0), Pip
+                                                (version >= 2.54.0, <= 
2.61.0), Pip
                                                 (version >= 20.3) and 
SetupTools
                                                 (version >= 37.0.0). Please
                                                 ensure that the specified
diff --git 
a/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out 
b/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
index 40ad2c0084ba..01ebe179d17d 100644
--- a/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
+++ b/flink-table/flink-sql-client/src/test/resources/cli/embedded-mode-help.out
@@ -82,7 +82,7 @@ Mode "embedded" (default) submits Flink jobs from the local 
machine.
                                                 /usr/local/bin/python3). The
                                                 python UDF worker depends on
                                                 Python 3.8+, Apache Beam
-                                                (version == 2.43.0), Pip
+                                                (version >= 2.54.0, <= 
2.61.0), Pip
                                                 (version >= 20.3) and 
SetupTools
                                                 (version >= 37.0.0). Please
                                                 ensure that the specified
diff --git a/pom.xml b/pom.xml
index 2d6c9933d1b0..b7685e862f79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -154,7 +154,7 @@ under the License.
                <hamcrest.version>1.3</hamcrest.version>
                <assertj.version>3.23.1</assertj.version>
                <py4j.version>0.10.9.7</py4j.version>
-               <beam.version>2.43.0</beam.version>
+               <beam.version>2.54.0</beam.version>
                <protoc.version>3.21.7</protoc.version>
                <okhttp.version>3.14.9</okhttp.version>
                <testcontainers.version>1.20.2</testcontainers.version>

Reply via email to