This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c31e646a8e [FLINK-25986][python] Add remaining FLIP-190 API methods 
to Python Table API
3c31e646a8e is described below

commit 3c31e646a8efc13943b33d012a70d748cf11cdbc
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Mar 14 10:20:47 2025 +0100

    [FLINK-25986][python] Add remaining FLIP-190 API methods to Python Table API
    
    This closes #26281.
---
 .../reference/pyflink.table/compiled_plans.rst     |  98 +++++++++++++++
 .../docs/reference/pyflink.table/descriptors.rst   |   8 +-
 .../docs/reference/pyflink.table/index.rst         |   1 +
 .../docs/reference/pyflink.table/statement_set.rst |   8 +-
 .../docs/reference/pyflink.table/table.rst         |   1 +
 .../reference/pyflink.table/table_environment.rst  |   9 +-
 flink-python/pyflink/table/__init__.py             |   6 +
 flink-python/pyflink/table/compiled_plan.py        | 140 +++++++++++++++++++++
 flink-python/pyflink/table/plan_reference.py       | 130 +++++++++++++++++++
 flink-python/pyflink/table/resolved_expression.py  |   2 +-
 flink-python/pyflink/table/schema.py               |   3 +-
 flink-python/pyflink/table/statement_set.py        |  30 +++++
 flink-python/pyflink/table/table.py                |   9 ++
 flink-python/pyflink/table/table_environment.py    |  64 ++++++++++
 flink-python/pyflink/table/table_pipeline.py       |  30 +++++
 .../table/tests/jsonplan/testGetJsonPlan.out       |  87 +++++++++++++
 .../pyflink/table/tests/test_compiled_plan.py      | 110 ++++++++++++++++
 ...eness.py => test_compiled_plan_completeness.py} |  22 ++--
 ...ness.py => test_plan_reference_completeness.py} |  20 +--
 ...eness.py => test_statement_set_completeness.py} |  22 ++--
 .../pyflink/table/tests/test_table_completeness.py |   2 +-
 ...s.py => test_table_environment_completeness.py} |  32 +++--
 .../pyflink/table/tests/test_table_pipeline.py     |  16 +++
 .../tests/test_table_pipeline_completeness.py      |   4 -
 .../org/apache/flink/table/api/Compilable.java     |   2 +-
 .../org/apache/flink/table/api/CompiledPlan.java   |   4 +
 .../org/apache/flink/table/api/PlanReference.java  |   2 +-
 pom.xml                                            |   1 +
 28 files changed, 796 insertions(+), 67 deletions(-)

diff --git a/flink-python/docs/reference/pyflink.table/compiled_plans.rst 
b/flink-python/docs/reference/pyflink.table/compiled_plans.rst
new file mode 100644
index 00000000000..94f45fc8e41
--- /dev/null
+++ b/flink-python/docs/reference/pyflink.table/compiled_plans.rst
@@ -0,0 +1,98 @@
+.. 
################################################################################
+     Licensed to the Apache Software Foundation (ASF) under one
+     or more contributor license agreements.  See the NOTICE file
+     distributed with this work for additional information
+     regarding copyright ownership.  The ASF licenses this file
+     to you under the Apache License, Version 2.0 (the
+     "License"); you may not use this file except in compliance
+     with the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+     Unless required by applicable law or agreed to in writing, software
+     distributed under the License is distributed on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     See the License for the specific language governing permissions and
+    limitations under the License.
+   
################################################################################
+
+
+==============
+Compiled Plans
+==============
+
+CompiledPlan
+------------
+
+Represents an immutable, fully optimized, and executable entity that has been 
compiled from a
+Table & SQL API pipeline definition. It encodes operators, expressions, 
functions, data types,
+and table connectors.
+
+Every new Flink version might introduce improved optimizer rules, more 
efficient operators,
+and other changes that impact the behavior of previously defined pipelines. In 
order to ensure
+backwards compatibility and enable stateful streaming job upgrades, compiled 
plans can be
+persisted and reloaded across Flink versions. See the
+`website documentation <https://flink.apache.org/documentation/>`_ for more 
information about
+provided guarantees during stateful pipeline upgrades.
+
+A plan can be compiled from a SQL query using
+:func:`~pyflink.table.TableEnvironment.compile_plan_sql`.
+It can be persisted using :func:`~pyflink.table.CompiledPlan.write_to_file` or 
by manually
+extracting the JSON representation with 
func:`~pyflink.table.CompiledPlan.as_json_string`.
+A plan can be loaded back from a file or a string using
+:func:`~pyflink.table.TableEnvironment.load_plan` with a 
:class:`~pyflink.table.PlanReference`.
+Instances can be executed using :func:`~pyflink.table.CompiledPlan.execute`.
+
+Depending on the configuration, permanent catalog metadata (such as 
information about tables
+and functions) will be persisted in the plan as well. Anonymous/inline objects 
will be
+persisted (including schema and options) if possible or fail the compilation 
otherwise.
+For temporary objects, only the identifier is part of the plan and the object 
needs to be
+present in the session context during a restore.
+
+JSON encoding is assumed to be the default representation of a compiled plan 
in all API
+endpoints, and is the format used to persist the plan to files by default.
+For advanced use cases, :func:`~pyflink.table.CompiledPlan.as_smile_bytes` 
provides a binary
+format representation of the compiled plan.
+
+.. note::
+    Plan restores assume a stable session context. Configuration, loaded 
modules and
+    catalogs, and temporary objects must not change. Schema evolution and 
changes of function
+    signatures are not supported.
+
+.. currentmodule:: pyflink.table
+
+.. autosummary::
+    :toctree: api/
+
+    CompiledPlan.as_json_string
+    CompiledPlan.as_smile_bytes
+    CompiledPlan.write_to_file
+    CompiledPlan.get_flink_version
+    CompiledPlan.print_json_string
+    CompiledPlan.execute
+    CompiledPlan.explain
+    CompiledPlan.print_explain
+
+PlanReference
+-------------
+
+Unresolved pointer to a persisted plan.
+
+A plan represents a static, executable entity that has been compiled from a 
Table & SQL API
+pipeline definition.
+
+You can load the content of this reference into a 
:class:`~pyflink.table.CompiledPlan`
+using :func:`~pyflink.table.TableEnvironment.load_plan` with a
+:class:`~pyflink.table.PlanReference`, or you can directly load and execute it 
with
+:func:`~pyflink.table.TableEnvironment.execute_plan`.
+
+.. seealso:: :class:`~pyflink.table.CompiledPlan`
+
+.. currentmodule:: pyflink.table
+
+.. autosummary::
+    :toctree: api/
+
+    PlanReference.from_file
+    PlanReference.from_json_string
+    PlanReference.from_smile_bytes
diff --git a/flink-python/docs/reference/pyflink.table/descriptors.rst 
b/flink-python/docs/reference/pyflink.table/descriptors.rst
index d606c4c11eb..55b198114b9 100644
--- a/flink-python/docs/reference/pyflink.table/descriptors.rst
+++ b/flink-python/docs/reference/pyflink.table/descriptors.rst
@@ -111,7 +111,7 @@ TableSchema
 
 A table schema that represents a table's structure with field names and data 
types.
 
-.. currentmodule:: pyflink.table.table_schema
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
@@ -132,7 +132,7 @@ ChangelogMode
 
 The set of changes contained in a changelog.
 
-.. currentmodule:: pyflink.table.changelog_mode
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
@@ -147,11 +147,13 @@ TablePipeline
 
 Describes a complete pipeline from one or more source tables to a sink table.
 
-.. currentmodule:: pyflink.table.table_pipeline
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
 
+    TablePipeline.compile_plan
     TablePipeline.execute
     TablePipeline.explain
+    TablePipeline.print_explain
     TablePipeline.get_sink_identifier
diff --git a/flink-python/docs/reference/pyflink.table/index.rst 
b/flink-python/docs/reference/pyflink.table/index.rst
index b4d26648ad8..326fb86327b 100644
--- a/flink-python/docs/reference/pyflink.table/index.rst
+++ b/flink-python/docs/reference/pyflink.table/index.rst
@@ -34,3 +34,4 @@ This page gives an overview of all public PyFlink Table API.
     descriptors
     statement_set
     catalog
+    compiled_plans
diff --git a/flink-python/docs/reference/pyflink.table/statement_set.rst 
b/flink-python/docs/reference/pyflink.table/statement_set.rst
index a822cb63e3d..74ea74d5a43 100644
--- a/flink-python/docs/reference/pyflink.table/statement_set.rst
+++ b/flink-python/docs/reference/pyflink.table/statement_set.rst
@@ -28,7 +28,7 @@ The planner can optimize all added statements together and 
then submit them as o
 
 The added statements will be cleared when calling the 
:func:`~StatementSet.execute` method.
 
-.. currentmodule:: pyflink.table.statement_set
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
@@ -36,7 +36,9 @@ The added statements will be cleared when calling the 
:func:`~StatementSet.execu
     StatementSet.add_insert_sql
     StatementSet.attach_as_datastream
     StatementSet.add_insert
+    StatementSet.compile_plan
     StatementSet.explain
+    StatementSet.print_explain
     StatementSet.execute
 
 
@@ -45,7 +47,7 @@ TableResult
 
 A :class:`~pyflink.table.TableResult` is the representation of the statement 
execution result.
 
-.. currentmodule:: pyflink.table.table_result
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
@@ -73,7 +75,7 @@ The statement (e.g. DDL, USE) executes successfully, and the 
result only contain
 The statement (e.g. DML, DQL, SHOW) executes successfully, and the result 
contains important
 content.
 
-.. currentmodule:: pyflink.table.table_result
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
diff --git a/flink-python/docs/reference/pyflink.table/table.rst 
b/flink-python/docs/reference/pyflink.table/table.rst
index 4c6843d8c4c..c4bd4eadfac 100644
--- a/flink-python/docs/reference/pyflink.table/table.rst
+++ b/flink-python/docs/reference/pyflink.table/table.rst
@@ -88,6 +88,7 @@ Example:
     Table.execute
     Table.execute_insert
     Table.explain
+    Table.print_explain
     Table.fetch
     Table.filter
     Table.flat_aggregate
diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst 
b/flink-python/docs/reference/pyflink.table/table_environment.rst
index 0a444d0d862..5825bd4a567 100644
--- a/flink-python/docs/reference/pyflink.table/table_environment.rst
+++ b/flink-python/docs/reference/pyflink.table/table_environment.rst
@@ -42,7 +42,7 @@ Example:
 :func:`~EnvironmentSettings.in_streaming_mode` or 
:func:`~EnvironmentSettings.in_batch_mode`
 might be convenient as shortcuts.
 
-.. currentmodule:: pyflink.table.environment_settings
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
@@ -144,13 +144,14 @@ keyword, thus must be escaped) in a catalog named 'cat.1' 
and database named 'db
     other Flink APIs, it might be necessary to use one of the available 
language-specific table
     environments in the corresponding bridging modules.
 
-.. currentmodule:: pyflink.table.table_environment
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
 
     TableEnvironment.add_python_archive
     TableEnvironment.add_python_file
+    TableEnvironment.compile_plan_sql
     TableEnvironment.create
     TableEnvironment.create_java_function
     TableEnvironment.create_java_temporary_function
@@ -169,6 +170,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1' 
and database named 'db
     TableEnvironment.drop_temporary_table
     TableEnvironment.drop_temporary_view
     TableEnvironment.drop_view
+    TableEnvironment.execute_plan
     TableEnvironment.execute_sql
     TableEnvironment.explain_sql
     TableEnvironment.from_descriptor
@@ -190,6 +192,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1' 
and database named 'db
     TableEnvironment.list_user_defined_functions
     TableEnvironment.list_views
     TableEnvironment.load_module
+    TableEnvironment.load_plan
     TableEnvironment.create_catalog
     TableEnvironment.register_catalog
     TableEnvironment.set_python_requirements
@@ -202,7 +205,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1' 
and database named 'db
 StreamTableEnvironment
 ----------------------
 
-.. currentmodule:: pyflink.table.table_environment
+.. currentmodule:: pyflink.table
 
 .. autosummary::
     :toctree: api/
diff --git a/flink-python/pyflink/table/__init__.py 
b/flink-python/pyflink/table/__init__.py
index 861afacb941..ad7b8b8b3ef 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -113,11 +113,13 @@ Other important classes:
 from __future__ import absolute_import
 
 from pyflink.table.changelog_mode import ChangelogMode
+from pyflink.table.compiled_plan import CompiledPlan
 from pyflink.table.data_view import DataView, ListView, MapView
 from pyflink.table.environment_settings import EnvironmentSettings
 from pyflink.table.explain_detail import ExplainDetail
 from pyflink.table.expression import Expression
 from pyflink.table.module import Module, ModuleEntry
+from pyflink.table.plan_reference import PlanReference
 from pyflink.table.result_kind import ResultKind
 from pyflink.table.schema import Schema
 from pyflink.table.sql_dialect import SqlDialect
@@ -127,6 +129,7 @@ from pyflink.table.table import GroupWindowedTable, 
GroupedTable, OverWindowedTa
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_descriptor import TableDescriptor, FormatDescriptor
 from pyflink.table.table_environment import (TableEnvironment, 
StreamTableEnvironment)
+from pyflink.table.table_pipeline import TablePipeline
 from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes, UserDefinedType, Row, RowKind
@@ -168,4 +171,7 @@ __all__ = [
     'ChangelogMode',
     'ExplainDetail',
     'ResultKind',
+    'CompiledPlan',
+    'PlanReference',
+    'TablePipeline'
 ]
diff --git a/flink-python/pyflink/table/compiled_plan.py 
b/flink-python/pyflink/table/compiled_plan.py
new file mode 100644
index 00000000000..be224ed91d8
--- /dev/null
+++ b/flink-python/pyflink/table/compiled_plan.py
@@ -0,0 +1,140 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pathlib import Path
+from typing import Union
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.table_result import TableResult
+from pyflink.util.java_utils import to_j_explain_detail_arr
+
+__all__ = ["CompiledPlan"]
+
+
+class CompiledPlan(object):
+    """
+    Represents an immutable, fully optimized, and executable entity that has 
been compiled from a
+    Table & SQL API pipeline definition. It encodes operators, expressions, 
functions, data types,
+    and table connectors.
+
+    Every new Flink version might introduce improved optimizer rules, more 
efficient operators,
+    and other changes that impact the behavior of previously defined 
pipelines. In order to ensure
+    backwards compatibility and enable stateful streaming job upgrades, 
compiled plans can be
+    persisted and reloaded across Flink versions. See the website 
documentation for more
+    information about provided guarantees during stateful pipeline upgrades.
+
+    A plan can be compiled from a SQL query using
+    :func:`~pyflink.table.TableEnvironment.compile_plan_sql`.
+    It can be persisted using 
:func:`~pyflink.table.CompiledPlan.write_to_file` or by manually
+    extracting the JSON representation with 
func:`~pyflink.table.CompiledPlan.as_json_string`.
+    A plan can be loaded back from a file or a string using
+    :func:`~pyflink.table.TableEnvironment.load_plan` with a 
:class:`~pyflink.table.PlanReference`.
+    Instances can be executed using 
:func:`~pyflink.table.CompiledPlan.execute`.
+
+    Depending on the configuration, permanent catalog metadata (such as 
information about tables
+    and functions) will be persisted in the plan as well. Anonymous/inline 
objects will be
+    persisted (including schema and options) if possible or fail the 
compilation otherwise.
+    For temporary objects, only the identifier is part of the plan and the 
object needs to be
+    present in the session context during a restore.
+
+    JSON encoding is assumed to be the default representation of a compiled 
plan in all API
+    endpoints, and is the format used to persist the plan to files by default.
+    For advanced use cases, :func:`~pyflink.table.CompiledPlan.as_smile_bytes` 
provides a binary
+    format representation of the compiled plan.
+
+    .. note::
+        Plan restores assume a stable session context. Configuration, loaded 
modules and
+        catalogs, and temporary objects must not change. Schema evolution and 
changes of function
+        signatures are not supported.
+    """
+
+    def __init__(self, j_compiled_plan, t_env):
+        self._j_compiled_plan = j_compiled_plan
+        self._t_env = t_env
+
+    def __str__(self) -> str:
+        return self._j_compiled_plan.toString()
+
+    def as_json_string(self) -> str:
+        """
+        Convert the plan to a JSON string representation.
+        """
+        return self._j_compiled_plan.asJsonString()
+
+    def as_smile_bytes(self) -> bytes:
+        """
+        Convert the plan to a Smile binary representation.
+        """
+        return self._j_compiled_plan.asSmileBytes()
+
+    def write_to_file(self, file: Union[str, Path], ignore_if_exists: bool = 
False):
+        """
+        Writes this plan to a file using the JSON representation.
+
+        :param ignore_if_exists: If a plan exists in the given file path and 
this flag is true,
+            no operation is executed and the plan is not overwritten. An 
exception is thrown
+            otherwise.
+        :raises TableException: if the file cannot be written or if 
``ignore_if_exists`` is false
+            and a plan already exists.
+        """
+        self._j_compiled_plan.writeToFile(str(file), ignore_if_exists)
+
+    def get_flink_version(self) -> str:
+        """
+        Returns the Flink version used to compile the plan.
+        """
+        return str(self._j_compiled_plan.getFlinkVersion())
+
+    def print_json_string(self):
+        """
+        Like :func:`~pyflink.table.CompiledPlan.as_json_string`, but prints 
the result to the
+        client console.
+
+        .. versionadded:: 2.1.0
+        """
+        self._j_compiled_plan.printJsonString()
+
+    def execute(self) -> TableResult:
+        """
+        Executes the compiled plan.
+        """
+        self._t_env._before_execute()
+        return TableResult(self._j_compiled_plan.execute())
+
+    def explain(self, *extra_details: ExplainDetail) -> str:
+        """
+        Returns the AST and the execution plan of the compiled plan.
+
+        :param extra_details: The extra explain details which the explain 
result should include,
+                              e.g. estimated cost, changelog mode for streaming
+        :return: AST and execution plans
+        """
+        gateway = get_gateway()
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_compiled_plan.explain(
+            gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, 
j_extra_details
+        )
+
+    def print_explain(self, *extra_details: ExplainDetail):
+        """
+        Like :func:`~pyflink.table.CompiledPlan.explain`, but prints the 
result to the client
+        console.
+
+        .. versionadded:: 2.1.0
+        """
+        print(self.explain(*extra_details))
diff --git a/flink-python/pyflink/table/plan_reference.py 
b/flink-python/pyflink/table/plan_reference.py
new file mode 100644
index 00000000000..d3fa02f767e
--- /dev/null
+++ b/flink-python/pyflink/table/plan_reference.py
@@ -0,0 +1,130 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink.java_gateway import get_gateway
+from pathlib import Path
+
+__all__ = [
+    "PlanReference",
+    "FilePlanReference",
+    "JsonContentPlanReference",
+    "BytesContentPlanReference",
+]
+
+
+class PlanReference(object):
+    """
+    Unresolved pointer to a persisted plan.
+
+    A plan represents a static, executable entity that has been compiled from 
a Table & SQL API
+    pipeline definition.
+
+    You can load the content of this reference into a 
:class:`~pyflink.table.CompiledPlan`
+    using :func:`~pyflink.table.TableEnvironment.load_plan` with a
+    :class:`~pyflink.table.PlanReference`, or you can directly load and 
execute it with
+    :func:`~pyflink.table.TableEnvironment.execute_plan`.
+
+    .. seealso:: :class:`~pyflink.table.CompiledPlan`
+    """
+
+    def __init__(self, j_plan_reference):
+        self._j_plan_reference = j_plan_reference
+
+    def __str__(self):
+        return self._j_plan_reference.toString()
+
+    def __hash__(self):
+        return self._j_plan_reference.hashCode()
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and 
self._j_plan_reference.equals(
+            other._j_plan_reference
+        )
+
+    @staticmethod
+    def from_file(path: Union[str, Path]) -> "FilePlanReference":
+        """
+        Create a reference starting from a file path.
+        """
+        gateway = get_gateway()
+        return FilePlanReference(
+            
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromFile(
+                str(path)
+            )
+        )
+
+    @staticmethod
+    def from_json_string(json_string: str) -> "JsonContentPlanReference":
+        """
+        Create a reference starting from a JSON string.
+        """
+        gateway = get_gateway()
+        return JsonContentPlanReference(
+            
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromJsonString(
+                json_string
+            )
+        )
+
+    @staticmethod
+    def from_smile_bytes(smile_bytes: bytes) -> "BytesContentPlanReference":
+        """
+        Create a reference starting from a Smile binary representation.
+        """
+        gateway = get_gateway()
+        return BytesContentPlanReference(
+            
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromSmileBytes(
+                smile_bytes
+            )
+        )
+
+
+class FilePlanReference(PlanReference):
+    """
+    Plan reference to a file in the local filesystem.
+    """
+
+    def get_file(self) -> Path:
+        """
+        Get the canonical path of the referenced file.
+        """
+        return Path(self._j_plan_reference.getFile().getCanonicalPath())
+
+
+class JsonContentPlanReference(PlanReference):
+    """
+    Plan reference to a string containing the serialized persisted plan in 
JSON.
+    """
+
+    def get_content(self) -> str:
+        """
+        Get the content of the referenced plan as a JSON string.
+        """
+        return self._j_plan_reference.getContent()
+
+
+class BytesContentPlanReference(PlanReference):
+    """
+    Plan reference to binary bytes containing the serialized persisted plan in 
Smile.
+    """
+
+    def get_content(self) -> bytes:
+        """
+        Return the content of the referenced plan as Smile binary bytes.
+        """
+        return self._j_plan_reference.getContent()
diff --git a/flink-python/pyflink/table/resolved_expression.py 
b/flink-python/pyflink/table/resolved_expression.py
index b5953e4e4d5..166bc4ebb79 100644
--- a/flink-python/pyflink/table/resolved_expression.py
+++ b/flink-python/pyflink/table/resolved_expression.py
@@ -17,7 +17,7 @@
 
################################################################################
 from typing import List
 
-from pyflink.table import Expression
+from pyflink.table.expression import Expression
 from pyflink.table.types import DataType, _from_java_data_type
 
 __all__ = ["ResolvedExpression"]
diff --git a/flink-python/pyflink/table/schema.py 
b/flink-python/pyflink/table/schema.py
index c70a8c01fee..666bbe61532 100644
--- a/flink-python/pyflink/table/schema.py
+++ b/flink-python/pyflink/table/schema.py
@@ -18,8 +18,7 @@
 from typing import Union, List, TYPE_CHECKING
 
 from pyflink.java_gateway import get_gateway
-from pyflink.table import Expression
-from pyflink.table.expression import _get_java_expression
+from pyflink.table.expression import Expression, _get_java_expression
 from pyflink.table.types import DataType, _to_java_data_type
 from pyflink.util.java_utils import to_jarray
 
diff --git a/flink-python/pyflink/table/statement_set.py 
b/flink-python/pyflink/table/statement_set.py
index 07ece0a56b1..456c342c958 100644
--- a/flink-python/pyflink/table/statement_set.py
+++ b/flink-python/pyflink/table/statement_set.py
@@ -19,6 +19,7 @@ from typing import Union
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table import ExplainDetail
+from pyflink.table.compiled_plan import CompiledPlan
 from pyflink.table.table_descriptor import TableDescriptor
 from pyflink.table.table_pipeline import TablePipeline
 from pyflink.table.table_result import TableResult
@@ -156,6 +157,15 @@ class StatementSet(object):
         j_extra_details = to_j_explain_detail_arr(extra_details)
         return self._j_statement_set.explain(TEXT, j_extra_details)
 
+    def print_explain(self, *extra_details: ExplainDetail):
+        """
+        Like :func:`~pyflink.table.StatementSet.explain`, but prints the 
result to the client
+        console.
+
+        .. versionadded:: 2.1.0
+        """
+        print(self.explain(*extra_details))
+
     def execute(self) -> TableResult:
         """
         execute all statements and Tables as a batch.
@@ -169,3 +179,23 @@ class StatementSet(object):
         """
         self._t_env._before_execute()
         return TableResult(self._j_statement_set.execute())
+
+    def compile_plan(self) -> CompiledPlan:
+        """
+        Compiles all statements into a :class:`~pyflink.table.CompiledPlan` 
that can be executed
+        as one job.
+
+        :class:`~pyflink.table.CompiledPlan`s can be persisted and reloaded 
across Flink versions.
+        They describe static pipelines to ensure backwards compatibility and 
enable stateful
+        streaming job upgrades. See :class:`~pyflink.table.CompiledPlan` and 
the website
+        documentation for more information.
+
+        .. note::
+            The compiled plan feature is experimental in batch mode.
+
+        :raises TableException: if any of the statements is invalid or if the 
plan cannot be
+            persisted.
+
+        .. versionadded:: 2.1.0
+        """
+        return 
CompiledPlan(j_compiled_plan=self._j_statement_set.compilePlan(), 
t_env=self._t_env)
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index 8726d3d52b0..0a4a9cd8489 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1090,6 +1090,15 @@ class Table(object):
         j_extra_details = to_j_explain_detail_arr(extra_details)
         return self._j_table.explain(TEXT, j_extra_details)
 
+    def print_explain(self, *extra_details: ExplainDetail):
+        """
+        Like :func:`~pyflink.table.Table.explain`, but prints the result to 
the client
+        console.
+
+        .. versionadded:: 2.1.0
+        """
+        print(self.explain(*extra_details))
+
     def insert_into(
         self, table_path_or_descriptor: Union[str, TableDescriptor], 
overwrite: bool = False
     ) -> TablePipeline:
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 245cf1ea148..61ac1553242 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -34,6 +34,8 @@ from pyflink.serializers import BatchedSerializer, 
PickleSerializer
 from pyflink.table import Table, EnvironmentSettings, Expression, 
ExplainDetail, \
     Module, ModuleEntry, Schema, ChangelogMode
 from pyflink.table.catalog import Catalog, CatalogDescriptor
+from pyflink.table.compiled_plan import CompiledPlan
+from pyflink.table.plan_reference import PlanReference
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.statement_set import StatementSet
 from pyflink.table.table_config import TableConfig
@@ -1409,6 +1411,68 @@ class TableEnvironment(object):
         finally:
             os.unlink(temp_file.name)
 
+    def load_plan(self, plan_reference: PlanReference) -> CompiledPlan:
+        """
+        Loads a plan from a :class:`~pyflink.table.PlanReference` into a
+        :class:`~pyflink.table.CompiledPlan`.
+
+        Compiled plans can be persisted and reloaded across Flink versions. 
They describe static
+        pipelines to ensure backwards compatibility and enable stateful 
streaming job upgrades. See
+        :class:`~pyflink.table.CompiledPlan` and the website documentation for 
more information.
+
+        This method will parse the input reference and will validate the plan. 
The returned
+        instance can be executed via 
:func:`~pyflink.table.CompiledPlan.execute`.
+
+        .. note::
+            The compiled plan feature is experimental in batch mode.
+
+        :raises TableException: if the plan cannot be loaded from the 
filesystem, or if the plan
+            is invalid.
+
+        .. versionadded:: 2.1.0
+        """
+        return CompiledPlan(
+            
j_compiled_plan=self._j_tenv.loadPlan(plan_reference._j_plan_reference),
+            t_env=self._j_tenv
+        )
+
+    def compile_plan_sql(self, stmt: str) -> CompiledPlan:
+        """
+        Compiles a SQL DML statement into a 
:class:`~pyflink.table.CompiledPlan`.
+
+        Compiled plans can be persisted and reloaded across Flink versions. 
They describe static
+        pipelines to ensure backwards compatibility and enable stateful 
streaming job upgrades. See
+        :class:`~pyflink.table.CompiledPlan` and the website documentation for 
more information.
+
+        .. note::
+            Only ``INSERT INTO`` is supported at the moment.
+
+        .. note::
+            The compiled plan feature is experimental in batch mode.
+
+        .. seealso::
+            :func:`~pyflink.table.TableEnvironment.load_plan`
+            :func:`~pyflink.table.CompiledPlan.execute`
+
+        :raises TableException: if the SQL statement is invalid or if the plan 
cannot be
+            persisted.
+
+        .. versionadded:: 2.1.0
+        """
+        return CompiledPlan(j_compiled_plan=self._j_tenv.compilePlanSql(stmt), 
t_env=self._j_tenv)
+
+    def execute_plan(self, plan_reference: PlanReference) -> TableResult:
+        """
+        Shorthand for ``tEnv.load_plan(plan_reference).execute()``.
+
+        .. seealso::
+            :func:`~pyflink.table.TableEnvironment.load_plan`
+            :func:`~pyflink.table.CompiledPlan.execute`
+
+        .. versionadded:: 2.1.0
+        """
+        return self.load_plan(plan_reference).execute()
+
     def _set_python_executable_for_local_executor(self):
         jvm = get_gateway().jvm
         j_config = get_j_env_configuration(self._get_j_env())
diff --git a/flink-python/pyflink/table/table_pipeline.py 
b/flink-python/pyflink/table/table_pipeline.py
index e2c82ac2ab4..64900e0df1e 100644
--- a/flink-python/pyflink/table/table_pipeline.py
+++ b/flink-python/pyflink/table/table_pipeline.py
@@ -20,6 +20,7 @@ from typing import Optional
 from pyflink.java_gateway import get_gateway
 from pyflink.table import ExplainDetail
 from pyflink.table.catalog import ObjectIdentifier
+from pyflink.table.compiled_plan import CompiledPlan
 from pyflink.table.table_result import TableResult
 from pyflink.util.java_utils import to_j_explain_detail_arr
 
@@ -63,6 +64,15 @@ class TablePipeline(object):
             gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, 
j_extra_details
         )
 
+    def print_explain(self, *extra_details: ExplainDetail):
+        """
+        Like :func:`~pyflink.table.TablePipeline.explain`, but prints the 
result to the client
+        console.
+
+        .. versionadded:: 2.1.0
+        """
+        print(self.explain(*extra_details))
+
     def get_sink_identifier(self) -> Optional[ObjectIdentifier]:
         """
         Returns the sink table's 
:class:`~pyflink.table.catalog.ObjectIdentifier`, if any.
@@ -79,3 +89,23 @@ class TablePipeline(object):
             if optional_result.isPresent()
             else None
         )
+
+    def compile_plan(self) -> CompiledPlan:
+        """
+        Compiles this :class:`TablePipeline` into a 
:class:`~pyflink.table.CompiledPlan` that can
+        be executed as one job.
+
+        :class:`~pyflink.table.CompiledPlan`s can be persisted and reloaded 
across Flink versions.
+        They describe static pipelines to ensure backwards compatibility and 
enable stateful
+        streaming job upgrades. See :class:`~pyflink.table.CompiledPlan` and 
the website
+        documentation for more information.
+
+        .. note::
+            The compiled plan feature is experimental in batch mode.
+
+        :raises TableException: if any of the statements is invalid or if the 
plan cannot be
+            persisted.
+
+        .. versionadded:: 2.1.0
+        """
+        return 
CompiledPlan(j_compiled_plan=self._j_table_pipeline.compilePlan(), 
t_env=self._t_env)
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
new file mode 100644
index 00000000000..7fcfad4bd7f
--- /dev/null
+++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
@@ -0,0 +1,87 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 0,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "datagen",
+            "number-of-rows" : "5"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 0,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "blackhole"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 0,
+    "target" : 0,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git a/flink-python/pyflink/table/tests/test_compiled_plan.py 
b/flink-python/pyflink/table/tests/test_compiled_plan.py
new file mode 100644
index 00000000000..fe0675e5939
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_compiled_plan.py
@@ -0,0 +1,110 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os.path
+import re
+from pathlib import Path
+
+from pyflink.table import Schema, DataTypes, TableDescriptor, PlanReference
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, 
PyFlinkTestCase
+
+THIS_DIR = os.path.dirname(os.path.abspath(__file__))
+JSON_PLAN_DIR = os.path.join(THIS_DIR, "jsonplan")
+
+
+def _replace_flink_version(plan: str) -> str:
+    """
+    Ignore the value for the Flink Version in the compiled plan.
+    """
+    return re.sub(r'"flinkVersion"\s*:\s*"[\w.-]*"', r'"flinkVersion" : ""', 
plan)
+
+
+def _replace_exec_node_id(plan: str) -> str:
+    """
+    Ignore id/source/target node ids, as these increment between test runs.
+    """
+    id = re.sub(r'"id"\s*:\s*\d+', r'"id" : 0', plan)
+    source = re.sub(r'"source"\s*:\s*\d+', r'"source" : 0', id)
+    target = re.sub(r'"target"\s*:\s*\d+', r'"target" : 0', source)
+    return target
+
+
+class CompiledPlanTest(PyFlinkStreamTableTestCase, PyFlinkTestCase):
+
+    def test_compile_plan_sql(self):
+        src = """
+        CREATE TABLE MyTable (a BIGINT, b INT, c VARCHAR)
+        WITH ('connector' = 'datagen', 'number-of-rows' = '5')
+        """
+        self.t_env.execute_sql(src)
+
+        sink = """
+        CREATE TABLE MySink (a BIGINT, b INT, c VARCHAR)
+        WITH ('connector' = 'blackhole')
+        """
+        self.t_env.execute_sql(sink)
+
+        compiled_plan = self.t_env.compile_plan_sql("INSERT INTO MySink SELECT 
* FROM MyTable")
+        with open(os.path.join(JSON_PLAN_DIR, "testGetJsonPlan.out")) as file:
+            expected = file.read()
+
+        self.maxDiff = None
+        self.assertEqual(
+            
_replace_exec_node_id(_replace_flink_version(compiled_plan.as_json_string())), 
expected
+        )
+
+    def test_write_load_compiled_plan(self):
+        schema = Schema.new_builder().column("f0", DataTypes.STRING()).build()
+        table = self.t_env.from_descriptor(
+            TableDescriptor.for_connector("datagen")
+            .option("number-of-rows", "10")
+            .schema(schema)
+            .build()
+        )
+        self.t_env.create_temporary_table(
+            "RegisteredSink",
+            TableDescriptor.for_connector("blackhole").schema(schema).build(),
+        )
+        table_pipeline = table.insert_into("RegisteredSink")
+        compiled_plan = table_pipeline.compile_plan()
+
+        plan_path = Path(self.tempdir + "/plan.out")
+        compiled_plan.write_to_file(plan_path)
+
+        plan_reference_from_file = PlanReference.from_file(plan_path)
+        compiled_plan_from_file = 
self.t_env.load_plan(plan_reference_from_file)
+        self.assertEqual(compiled_plan.as_json_string(), 
compiled_plan_from_file.as_json_string())
+
+        with open(plan_path) as file:
+            content = file.read()
+            plan_reference_from_string = 
PlanReference.from_json_string(content)
+            compiled_plan_from_string = 
self.t_env.load_plan(plan_reference_from_string)
+            self.assertEqual(
+                compiled_plan.as_json_string(), 
compiled_plan_from_string.as_json_string()
+            )
+
+
+if __name__ == "__main__":
+    import unittest
+
+    try:
+        import xmlrunner
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git 
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py 
b/flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
similarity index 70%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
index 8ba2b10a572..4306978ae78 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
@@ -15,36 +15,32 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from pyflink.table import CompiledPlan
 from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, 
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
 
 
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+class CompiledPlanAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
     """
-    Tests whether the Python :class:`TablePipeline` is consistent with
-    Java `org.apache.flink.table.api.TablePipeline`.
+    Tests whether the Python :class:`CompiledPlan` is consistent with
+    Java `org.apache.flink.table.api.CompiledPlan`.
     """
 
     @classmethod
     def python_class(cls):
-        return TablePipeline
+        return CompiledPlan
 
     @classmethod
     def java_class(cls):
-        return "org.apache.flink.table.api.TablePipeline"
-
-    @classmethod
-    def excluded_methods(cls):
-        return {'printExplain', 'compilePlan'}
+        return "org.apache.flink.table.api.CompiledPlan"
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     import unittest
 
     try:
         import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
     except ImportError:
         testRunner = None
     unittest.main(testRunner=testRunner, verbosity=2)
diff --git 
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py 
b/flink-python/pyflink/table/tests/test_plan_reference_completeness.py
similarity index 74%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_plan_reference_completeness.py
index 8ba2b10a572..755e134387c 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_plan_reference_completeness.py
@@ -15,36 +15,36 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from pyflink.table import PlanReference
 from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, 
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
 
 
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+class PlanReferenceAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
     """
-    Tests whether the Python :class:`TablePipeline` is consistent with
-    Java `org.apache.flink.table.api.TablePipeline`.
+    Tests whether the Python :class:`PlanReference` is consistent with
+    Java `org.apache.flink.table.api.PlanReference`.
     """
 
     @classmethod
     def python_class(cls):
-        return TablePipeline
+        return PlanReference
 
     @classmethod
     def java_class(cls):
-        return "org.apache.flink.table.api.TablePipeline"
+        return "org.apache.flink.table.api.PlanReference"
 
     @classmethod
     def excluded_methods(cls):
-        return {'printExplain', 'compilePlan'}
+        return {"fromResource"}
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     import unittest
 
     try:
         import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
     except ImportError:
         testRunner = None
     unittest.main(testRunner=testRunner, verbosity=2)
diff --git 
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py 
b/flink-python/pyflink/table/tests/test_statement_set_completeness.py
similarity index 70%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_statement_set_completeness.py
index 8ba2b10a572..ebc5e39feb9 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_statement_set_completeness.py
@@ -15,36 +15,32 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from pyflink.table import StatementSet
 from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, 
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
 
 
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+class StatementSetAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
     """
-    Tests whether the Python :class:`TablePipeline` is consistent with
-    Java `org.apache.flink.table.api.TablePipeline`.
+    Tests whether the Python :class:`StatementSet` is consistent with
+    Java `org.apache.flink.table.api.StatementSet`.
     """
 
     @classmethod
     def python_class(cls):
-        return TablePipeline
+        return StatementSet
 
     @classmethod
     def java_class(cls):
-        return "org.apache.flink.table.api.TablePipeline"
-
-    @classmethod
-    def excluded_methods(cls):
-        return {'printExplain', 'compilePlan'}
+        return "org.apache.flink.table.api.StatementSet"
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     import unittest
 
     try:
         import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
     except ImportError:
         testRunner = None
     unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py 
b/flink-python/pyflink/table/tests/test_table_completeness.py
index f01f9116b24..947c4be47ec 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -36,7 +36,7 @@ class 
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
 
     @classmethod
     def excluded_methods(cls):
-        return {'createTemporalTableFunction', 'getQueryOperation', 
'printExplain'}
+        return {'createTemporalTableFunction', 'getQueryOperation'}
 
     @classmethod
     def java_method_name(cls, python_method_name):
diff --git 
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py 
b/flink-python/pyflink/table/tests/test_table_environment_completeness.py
similarity index 66%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_table_environment_completeness.py
index 8ba2b10a572..b4fd310631f 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_completeness.py
@@ -15,36 +15,44 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from pyflink.table import TableEnvironment
 from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, 
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
 
 
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
+class TableEnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, 
PyFlinkTestCase):
     """
-    Tests whether the Python :class:`TablePipeline` is consistent with
-    Java `org.apache.flink.table.api.TablePipeline`.
+    Tests whether the Python :class:`TableEnvironment` is consistent with
+    Java `org.apache.flink.table.api.TableEnvironment`.
     """
 
     @classmethod
     def python_class(cls):
-        return TablePipeline
+        return TableEnvironment
 
     @classmethod
     def java_class(cls):
-        return "org.apache.flink.table.api.TablePipeline"
+        return "org.apache.flink.table.api.TableEnvironment"
 
     @classmethod
     def excluded_methods(cls):
-        return {'printExplain', 'compilePlan'}
-
-
-if __name__ == '__main__':
+        return {
+            "fromValues",
+            "createFunction",
+            "getCompletionHints",
+            "scan",
+            "registerTable",
+            "from",
+            "registerFunction",
+        }
+
+
+if __name__ == "__main__":
     import unittest
 
     try:
         import xmlrunner
-        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
     except ImportError:
         testRunner = None
     unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_pipeline.py 
b/flink-python/pyflink/table/tests/test_table_pipeline.py
index c7a596ebd81..ed6f4b0f933 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline.py
+++ b/flink-python/pyflink/table/tests/test_table_pipeline.py
@@ -82,6 +82,22 @@ class TablePipelineTest(PyFlinkStreamTableTestCase):
             "default_catalog.default_database.RegisteredSinkForExecute",
         )
 
+    def test_table_pipeline_compile(self):
+        schema = Schema.new_builder().column("f0", DataTypes.STRING()).build()
+        table = self.t_env.from_descriptor(
+            TableDescriptor.for_connector("datagen")
+            .option("number-of-rows", "10")
+            .schema(schema)
+            .build()
+        )
+        self.t_env.create_temporary_table(
+            "RegisteredSinkForPlanCompile",
+            TableDescriptor.for_connector("blackhole").schema(schema).build(),
+        )
+        table_pipeline = table.insert_into("RegisteredSinkForPlanCompile")
+        compiled_plan = table_pipeline.compile_plan()
+        self.assertIsNotNone(compiled_plan.as_json_string())
+
 
 if __name__ == "__main__":
     import unittest
diff --git 
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py 
b/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
index 8ba2b10a572..2942d35e771 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
@@ -34,10 +34,6 @@ class 
TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTe
     def java_class(cls):
         return "org.apache.flink.table.api.TablePipeline"
 
-    @classmethod
-    def excluded_methods(cls):
-        return {'printExplain', 'compilePlan'}
-
 
 if __name__ == '__main__':
     import unittest
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
index e9b384d0e5f..643cf958db7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
@@ -36,7 +36,7 @@ public interface Compilable {
      * pipelines to ensure backwards compatibility and enable stateful 
streaming job upgrades. See
      * {@link CompiledPlan} and the website documentation for more information.
      *
-     * <p>Note: The compiled plan feature is not supported in batch mode.
+     * <p>Note: The compiled plan feature is experimental in batch mode.
      *
      * @throws TableException if any of the statements is invalid or if the 
plan cannot be
      *     persisted.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
index e9295e2d182..a1c90c7f017 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
@@ -49,6 +49,10 @@ import java.nio.file.Paths;
  * objects, only the identifier is part of the plan and the object needs to be 
present in the
  * session context during a restore.
  *
+ * <p>JSON encoding is assumed to be the default representation of a compiled 
plan in all API
+ * endpoints, and is the format used to persist the plan to files by default. 
For advanced use
+ * cases, {@link #asSmileBytes()} provides a binary format representation of 
the compiled plan.
+ *
  * <p>Note: Plan restores assume a stable session context. Configuration, 
loaded modules and
  * catalogs, and temporary objects must not change. Schema evolution and 
changes of function
  * signatures are not supported.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
index 9e628b71ca8..d7223485167 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
@@ -168,7 +168,7 @@ public abstract class PlanReference {
         }
     }
 
-    /** Plan reference to a string containing the serialized persisted plan in 
Smile. */
+    /** Plan reference to binary bytes containing the serialized persisted 
plan in Smile. */
     @PublicEvolving
     public static class BytesContentPlanReference extends PlanReference {
 
diff --git a/pom.xml b/pom.xml
index 4157a46f414..9581bd2664f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1641,6 +1641,7 @@ under the License.
                                                
<exclude>flink-runtime-web/web-dashboard/dev/notice-template</exclude>
                                                
<exclude>flink-examples/flink-examples-streaming/src/main/resources/datas/**</exclude>
                                                
<exclude>flink-examples/flink-examples-streaming/src/test/resources/datas/**</exclude>
+                                               
<exclude>flink-python/pyflink/table/tests/jsonplan/*</exclude>
                                                <!-- ArchUnit violation stores  
-->
                                                
<exclude>**/archunit-violations/**</exclude>
 

Reply via email to