This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3c31e646a8e [FLINK-25986][python] Add remaining FLIP-190 API methods
to Python Table API
3c31e646a8e is described below
commit 3c31e646a8efc13943b33d012a70d748cf11cdbc
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Mar 14 10:20:47 2025 +0100
[FLINK-25986][python] Add remaining FLIP-190 API methods to Python Table API
This closes #26281.
---
.../reference/pyflink.table/compiled_plans.rst | 98 +++++++++++++++
.../docs/reference/pyflink.table/descriptors.rst | 8 +-
.../docs/reference/pyflink.table/index.rst | 1 +
.../docs/reference/pyflink.table/statement_set.rst | 8 +-
.../docs/reference/pyflink.table/table.rst | 1 +
.../reference/pyflink.table/table_environment.rst | 9 +-
flink-python/pyflink/table/__init__.py | 6 +
flink-python/pyflink/table/compiled_plan.py | 140 +++++++++++++++++++++
flink-python/pyflink/table/plan_reference.py | 130 +++++++++++++++++++
flink-python/pyflink/table/resolved_expression.py | 2 +-
flink-python/pyflink/table/schema.py | 3 +-
flink-python/pyflink/table/statement_set.py | 30 +++++
flink-python/pyflink/table/table.py | 9 ++
flink-python/pyflink/table/table_environment.py | 64 ++++++++++
flink-python/pyflink/table/table_pipeline.py | 30 +++++
.../table/tests/jsonplan/testGetJsonPlan.out | 87 +++++++++++++
.../pyflink/table/tests/test_compiled_plan.py | 110 ++++++++++++++++
...eness.py => test_compiled_plan_completeness.py} | 22 ++--
...ness.py => test_plan_reference_completeness.py} | 20 +--
...eness.py => test_statement_set_completeness.py} | 22 ++--
.../pyflink/table/tests/test_table_completeness.py | 2 +-
...s.py => test_table_environment_completeness.py} | 32 +++--
.../pyflink/table/tests/test_table_pipeline.py | 16 +++
.../tests/test_table_pipeline_completeness.py | 4 -
.../org/apache/flink/table/api/Compilable.java | 2 +-
.../org/apache/flink/table/api/CompiledPlan.java | 4 +
.../org/apache/flink/table/api/PlanReference.java | 2 +-
pom.xml | 1 +
28 files changed, 796 insertions(+), 67 deletions(-)
diff --git a/flink-python/docs/reference/pyflink.table/compiled_plans.rst
b/flink-python/docs/reference/pyflink.table/compiled_plans.rst
new file mode 100644
index 00000000000..94f45fc8e41
--- /dev/null
+++ b/flink-python/docs/reference/pyflink.table/compiled_plans.rst
@@ -0,0 +1,98 @@
+..
################################################################################
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
################################################################################
+
+
+==============
+Compiled Plans
+==============
+
+CompiledPlan
+------------
+
+Represents an immutable, fully optimized, and executable entity that has been
compiled from a
+Table & SQL API pipeline definition. It encodes operators, expressions,
functions, data types,
+and table connectors.
+
+Every new Flink version might introduce improved optimizer rules, more
efficient operators,
+and other changes that impact the behavior of previously defined pipelines. In
order to ensure
+backwards compatibility and enable stateful streaming job upgrades, compiled
plans can be
+persisted and reloaded across Flink versions. See the
+`website documentation <https://flink.apache.org/documentation/>`_ for more
information about
+provided guarantees during stateful pipeline upgrades.
+
+A plan can be compiled from a SQL query using
+:func:`~pyflink.table.TableEnvironment.compile_plan_sql`.
+It can be persisted using :func:`~pyflink.table.CompiledPlan.write_to_file` or
by manually
+extracting the JSON representation with
func:`~pyflink.table.CompiledPlan.as_json_string`.
+A plan can be loaded back from a file or a string using
+:func:`~pyflink.table.TableEnvironment.load_plan` with a
:class:`~pyflink.table.PlanReference`.
+Instances can be executed using :func:`~pyflink.table.CompiledPlan.execute`.
+
+Depending on the configuration, permanent catalog metadata (such as
information about tables
+and functions) will be persisted in the plan as well. Anonymous/inline objects
will be
+persisted (including schema and options) if possible or fail the compilation
otherwise.
+For temporary objects, only the identifier is part of the plan and the object
needs to be
+present in the session context during a restore.
+
+JSON encoding is assumed to be the default representation of a compiled plan
in all API
+endpoints, and is the format used to persist the plan to files by default.
+For advanced use cases, :func:`~pyflink.table.CompiledPlan.as_smile_bytes`
provides a binary
+format representation of the compiled plan.
+
+.. note::
+ Plan restores assume a stable session context. Configuration, loaded
modules and
+ catalogs, and temporary objects must not change. Schema evolution and
changes of function
+ signatures are not supported.
+
+.. currentmodule:: pyflink.table
+
+.. autosummary::
+ :toctree: api/
+
+ CompiledPlan.as_json_string
+ CompiledPlan.as_smile_bytes
+ CompiledPlan.write_to_file
+ CompiledPlan.get_flink_version
+ CompiledPlan.print_json_string
+ CompiledPlan.execute
+ CompiledPlan.explain
+ CompiledPlan.print_explain
+
+PlanReference
+-------------
+
+Unresolved pointer to a persisted plan.
+
+A plan represents a static, executable entity that has been compiled from a
Table & SQL API
+pipeline definition.
+
+You can load the content of this reference into a
:class:`~pyflink.table.CompiledPlan`
+using :func:`~pyflink.table.TableEnvironment.load_plan` with a
+:class:`~pyflink.table.PlanReference`, or you can directly load and execute it
with
+:func:`~pyflink.table.TableEnvironment.execute_plan`.
+
+.. seealso:: :class:`~pyflink.table.CompiledPlan`
+
+.. currentmodule:: pyflink.table
+
+.. autosummary::
+ :toctree: api/
+
+ PlanReference.from_file
+ PlanReference.from_json_string
+ PlanReference.from_smile_bytes
diff --git a/flink-python/docs/reference/pyflink.table/descriptors.rst
b/flink-python/docs/reference/pyflink.table/descriptors.rst
index d606c4c11eb..55b198114b9 100644
--- a/flink-python/docs/reference/pyflink.table/descriptors.rst
+++ b/flink-python/docs/reference/pyflink.table/descriptors.rst
@@ -111,7 +111,7 @@ TableSchema
A table schema that represents a table's structure with field names and data
types.
-.. currentmodule:: pyflink.table.table_schema
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
@@ -132,7 +132,7 @@ ChangelogMode
The set of changes contained in a changelog.
-.. currentmodule:: pyflink.table.changelog_mode
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
@@ -147,11 +147,13 @@ TablePipeline
Describes a complete pipeline from one or more source tables to a sink table.
-.. currentmodule:: pyflink.table.table_pipeline
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
+ TablePipeline.compile_plan
TablePipeline.execute
TablePipeline.explain
+ TablePipeline.print_explain
TablePipeline.get_sink_identifier
diff --git a/flink-python/docs/reference/pyflink.table/index.rst
b/flink-python/docs/reference/pyflink.table/index.rst
index b4d26648ad8..326fb86327b 100644
--- a/flink-python/docs/reference/pyflink.table/index.rst
+++ b/flink-python/docs/reference/pyflink.table/index.rst
@@ -34,3 +34,4 @@ This page gives an overview of all public PyFlink Table API.
descriptors
statement_set
catalog
+ compiled_plans
diff --git a/flink-python/docs/reference/pyflink.table/statement_set.rst
b/flink-python/docs/reference/pyflink.table/statement_set.rst
index a822cb63e3d..74ea74d5a43 100644
--- a/flink-python/docs/reference/pyflink.table/statement_set.rst
+++ b/flink-python/docs/reference/pyflink.table/statement_set.rst
@@ -28,7 +28,7 @@ The planner can optimize all added statements together and
then submit them as o
The added statements will be cleared when calling the
:func:`~StatementSet.execute` method.
-.. currentmodule:: pyflink.table.statement_set
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
@@ -36,7 +36,9 @@ The added statements will be cleared when calling the
:func:`~StatementSet.execu
StatementSet.add_insert_sql
StatementSet.attach_as_datastream
StatementSet.add_insert
+ StatementSet.compile_plan
StatementSet.explain
+ StatementSet.print_explain
StatementSet.execute
@@ -45,7 +47,7 @@ TableResult
A :class:`~pyflink.table.TableResult` is the representation of the statement
execution result.
-.. currentmodule:: pyflink.table.table_result
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
@@ -73,7 +75,7 @@ The statement (e.g. DDL, USE) executes successfully, and the
result only contain
The statement (e.g. DML, DQL, SHOW) executes successfully, and the result
contains important
content.
-.. currentmodule:: pyflink.table.table_result
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
diff --git a/flink-python/docs/reference/pyflink.table/table.rst
b/flink-python/docs/reference/pyflink.table/table.rst
index 4c6843d8c4c..c4bd4eadfac 100644
--- a/flink-python/docs/reference/pyflink.table/table.rst
+++ b/flink-python/docs/reference/pyflink.table/table.rst
@@ -88,6 +88,7 @@ Example:
Table.execute
Table.execute_insert
Table.explain
+ Table.print_explain
Table.fetch
Table.filter
Table.flat_aggregate
diff --git a/flink-python/docs/reference/pyflink.table/table_environment.rst
b/flink-python/docs/reference/pyflink.table/table_environment.rst
index 0a444d0d862..5825bd4a567 100644
--- a/flink-python/docs/reference/pyflink.table/table_environment.rst
+++ b/flink-python/docs/reference/pyflink.table/table_environment.rst
@@ -42,7 +42,7 @@ Example:
:func:`~EnvironmentSettings.in_streaming_mode` or
:func:`~EnvironmentSettings.in_batch_mode`
might be convenient as shortcuts.
-.. currentmodule:: pyflink.table.environment_settings
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
@@ -144,13 +144,14 @@ keyword, thus must be escaped) in a catalog named 'cat.1'
and database named 'db
other Flink APIs, it might be necessary to use one of the available
language-specific table
environments in the corresponding bridging modules.
-.. currentmodule:: pyflink.table.table_environment
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
TableEnvironment.add_python_archive
TableEnvironment.add_python_file
+ TableEnvironment.compile_plan_sql
TableEnvironment.create
TableEnvironment.create_java_function
TableEnvironment.create_java_temporary_function
@@ -169,6 +170,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1'
and database named 'db
TableEnvironment.drop_temporary_table
TableEnvironment.drop_temporary_view
TableEnvironment.drop_view
+ TableEnvironment.execute_plan
TableEnvironment.execute_sql
TableEnvironment.explain_sql
TableEnvironment.from_descriptor
@@ -190,6 +192,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1'
and database named 'db
TableEnvironment.list_user_defined_functions
TableEnvironment.list_views
TableEnvironment.load_module
+ TableEnvironment.load_plan
TableEnvironment.create_catalog
TableEnvironment.register_catalog
TableEnvironment.set_python_requirements
@@ -202,7 +205,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1'
and database named 'db
StreamTableEnvironment
----------------------
-.. currentmodule:: pyflink.table.table_environment
+.. currentmodule:: pyflink.table
.. autosummary::
:toctree: api/
diff --git a/flink-python/pyflink/table/__init__.py
b/flink-python/pyflink/table/__init__.py
index 861afacb941..ad7b8b8b3ef 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -113,11 +113,13 @@ Other important classes:
from __future__ import absolute_import
from pyflink.table.changelog_mode import ChangelogMode
+from pyflink.table.compiled_plan import CompiledPlan
from pyflink.table.data_view import DataView, ListView, MapView
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.explain_detail import ExplainDetail
from pyflink.table.expression import Expression
from pyflink.table.module import Module, ModuleEntry
+from pyflink.table.plan_reference import PlanReference
from pyflink.table.result_kind import ResultKind
from pyflink.table.schema import Schema
from pyflink.table.sql_dialect import SqlDialect
@@ -127,6 +129,7 @@ from pyflink.table.table import GroupWindowedTable,
GroupedTable, OverWindowedTa
from pyflink.table.table_config import TableConfig
from pyflink.table.table_descriptor import TableDescriptor, FormatDescriptor
from pyflink.table.table_environment import (TableEnvironment,
StreamTableEnvironment)
+from pyflink.table.table_pipeline import TablePipeline
from pyflink.table.table_result import TableResult
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes, UserDefinedType, Row, RowKind
@@ -168,4 +171,7 @@ __all__ = [
'ChangelogMode',
'ExplainDetail',
'ResultKind',
+ 'CompiledPlan',
+ 'PlanReference',
+ 'TablePipeline'
]
diff --git a/flink-python/pyflink/table/compiled_plan.py
b/flink-python/pyflink/table/compiled_plan.py
new file mode 100644
index 00000000000..be224ed91d8
--- /dev/null
+++ b/flink-python/pyflink/table/compiled_plan.py
@@ -0,0 +1,140 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pathlib import Path
+from typing import Union
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.explain_detail import ExplainDetail
+from pyflink.table.table_result import TableResult
+from pyflink.util.java_utils import to_j_explain_detail_arr
+
+__all__ = ["CompiledPlan"]
+
+
+class CompiledPlan(object):
+ """
+ Represents an immutable, fully optimized, and executable entity that has
been compiled from a
+ Table & SQL API pipeline definition. It encodes operators, expressions,
functions, data types,
+ and table connectors.
+
+ Every new Flink version might introduce improved optimizer rules, more
efficient operators,
+ and other changes that impact the behavior of previously defined
pipelines. In order to ensure
+ backwards compatibility and enable stateful streaming job upgrades,
compiled plans can be
+ persisted and reloaded across Flink versions. See the website
documentation for more
+ information about provided guarantees during stateful pipeline upgrades.
+
+ A plan can be compiled from a SQL query using
+ :func:`~pyflink.table.TableEnvironment.compile_plan_sql`.
+ It can be persisted using
:func:`~pyflink.table.CompiledPlan.write_to_file` or by manually
+ extracting the JSON representation with
func:`~pyflink.table.CompiledPlan.as_json_string`.
+ A plan can be loaded back from a file or a string using
+ :func:`~pyflink.table.TableEnvironment.load_plan` with a
:class:`~pyflink.table.PlanReference`.
+ Instances can be executed using
:func:`~pyflink.table.CompiledPlan.execute`.
+
+ Depending on the configuration, permanent catalog metadata (such as
information about tables
+ and functions) will be persisted in the plan as well. Anonymous/inline
objects will be
+ persisted (including schema and options) if possible or fail the
compilation otherwise.
+ For temporary objects, only the identifier is part of the plan and the
object needs to be
+ present in the session context during a restore.
+
+ JSON encoding is assumed to be the default representation of a compiled
plan in all API
+ endpoints, and is the format used to persist the plan to files by default.
+ For advanced use cases, :func:`~pyflink.table.CompiledPlan.as_smile_bytes`
provides a binary
+ format representation of the compiled plan.
+
+ .. note::
+ Plan restores assume a stable session context. Configuration, loaded
modules and
+ catalogs, and temporary objects must not change. Schema evolution and
changes of function
+ signatures are not supported.
+ """
+
+ def __init__(self, j_compiled_plan, t_env):
+ self._j_compiled_plan = j_compiled_plan
+ self._t_env = t_env
+
+ def __str__(self) -> str:
+ return self._j_compiled_plan.toString()
+
+ def as_json_string(self) -> str:
+ """
+ Convert the plan to a JSON string representation.
+ """
+ return self._j_compiled_plan.asJsonString()
+
+ def as_smile_bytes(self) -> bytes:
+ """
+ Convert the plan to a Smile binary representation.
+ """
+ return self._j_compiled_plan.asSmileBytes()
+
+ def write_to_file(self, file: Union[str, Path], ignore_if_exists: bool =
False):
+ """
+ Writes this plan to a file using the JSON representation.
+
+ :param ignore_if_exists: If a plan exists in the given file path and
this flag is true,
+ no operation is executed and the plan is not overwritten. An
exception is thrown
+ otherwise.
+ :raises TableException: if the file cannot be written or if
``ignore_if_exists`` is false
+ and a plan already exists.
+ """
+ self._j_compiled_plan.writeToFile(str(file), ignore_if_exists)
+
+ def get_flink_version(self) -> str:
+ """
+ Returns the Flink version used to compile the plan.
+ """
+ return str(self._j_compiled_plan.getFlinkVersion())
+
+ def print_json_string(self):
+ """
+ Like :func:`~pyflink.table.CompiledPlan.as_json_string`, but prints
the result to the
+ client console.
+
+ .. versionadded:: 2.1.0
+ """
+ self._j_compiled_plan.printJsonString()
+
+ def execute(self) -> TableResult:
+ """
+ Executes the compiled plan.
+ """
+ self._t_env._before_execute()
+ return TableResult(self._j_compiled_plan.execute())
+
+ def explain(self, *extra_details: ExplainDetail) -> str:
+ """
+ Returns the AST and the execution plan of the compiled plan.
+
+ :param extra_details: The extra explain details which the explain
result should include,
+ e.g. estimated cost, changelog mode for streaming
+ :return: AST and execution plans
+ """
+ gateway = get_gateway()
+ j_extra_details = to_j_explain_detail_arr(extra_details)
+ return self._j_compiled_plan.explain(
+ gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT,
j_extra_details
+ )
+
+ def print_explain(self, *extra_details: ExplainDetail):
+ """
+ Like :func:`~pyflink.table.CompiledPlan.explain`, but prints the
result to the client
+ console.
+
+ .. versionadded:: 2.1.0
+ """
+ print(self.explain(*extra_details))
diff --git a/flink-python/pyflink/table/plan_reference.py
b/flink-python/pyflink/table/plan_reference.py
new file mode 100644
index 00000000000..d3fa02f767e
--- /dev/null
+++ b/flink-python/pyflink/table/plan_reference.py
@@ -0,0 +1,130 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import Union
+
+from pyflink.java_gateway import get_gateway
+from pathlib import Path
+
+__all__ = [
+ "PlanReference",
+ "FilePlanReference",
+ "JsonContentPlanReference",
+ "BytesContentPlanReference",
+]
+
+
+class PlanReference(object):
+ """
+ Unresolved pointer to a persisted plan.
+
+ A plan represents a static, executable entity that has been compiled from
a Table & SQL API
+ pipeline definition.
+
+ You can load the content of this reference into a
:class:`~pyflink.table.CompiledPlan`
+ using :func:`~pyflink.table.TableEnvironment.load_plan` with a
+ :class:`~pyflink.table.PlanReference`, or you can directly load and
execute it with
+ :func:`~pyflink.table.TableEnvironment.execute_plan`.
+
+ .. seealso:: :class:`~pyflink.table.CompiledPlan`
+ """
+
+ def __init__(self, j_plan_reference):
+ self._j_plan_reference = j_plan_reference
+
+ def __str__(self):
+ return self._j_plan_reference.toString()
+
+ def __hash__(self):
+ return self._j_plan_reference.hashCode()
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and
self._j_plan_reference.equals(
+ other._j_plan_reference
+ )
+
+ @staticmethod
+ def from_file(path: Union[str, Path]) -> "FilePlanReference":
+ """
+ Create a reference starting from a file path.
+ """
+ gateway = get_gateway()
+ return FilePlanReference(
+
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromFile(
+ str(path)
+ )
+ )
+
+ @staticmethod
+ def from_json_string(json_string: str) -> "JsonContentPlanReference":
+ """
+ Create a reference starting from a JSON string.
+ """
+ gateway = get_gateway()
+ return JsonContentPlanReference(
+
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromJsonString(
+ json_string
+ )
+ )
+
+ @staticmethod
+ def from_smile_bytes(smile_bytes: bytes) -> "BytesContentPlanReference":
+ """
+ Create a reference starting from a Smile binary representation.
+ """
+ gateway = get_gateway()
+ return BytesContentPlanReference(
+
j_plan_reference=gateway.jvm.org.apache.flink.table.api.PlanReference.fromSmileBytes(
+ smile_bytes
+ )
+ )
+
+
+class FilePlanReference(PlanReference):
+ """
+ Plan reference to a file in the local filesystem.
+ """
+
+ def get_file(self) -> Path:
+ """
+ Get the canonical path of the referenced file.
+ """
+ return Path(self._j_plan_reference.getFile().getCanonicalPath())
+
+
+class JsonContentPlanReference(PlanReference):
+ """
+ Plan reference to a string containing the serialized persisted plan in
JSON.
+ """
+
+ def get_content(self) -> str:
+ """
+ Get the content of the referenced plan as a JSON string.
+ """
+ return self._j_plan_reference.getContent()
+
+
+class BytesContentPlanReference(PlanReference):
+ """
+ Plan reference to binary bytes containing the serialized persisted plan in
Smile.
+ """
+
+ def get_content(self) -> bytes:
+ """
+ Return the content of the referenced plan as Smile binary bytes.
+ """
+ return self._j_plan_reference.getContent()
diff --git a/flink-python/pyflink/table/resolved_expression.py
b/flink-python/pyflink/table/resolved_expression.py
index b5953e4e4d5..166bc4ebb79 100644
--- a/flink-python/pyflink/table/resolved_expression.py
+++ b/flink-python/pyflink/table/resolved_expression.py
@@ -17,7 +17,7 @@
################################################################################
from typing import List
-from pyflink.table import Expression
+from pyflink.table.expression import Expression
from pyflink.table.types import DataType, _from_java_data_type
__all__ = ["ResolvedExpression"]
diff --git a/flink-python/pyflink/table/schema.py
b/flink-python/pyflink/table/schema.py
index c70a8c01fee..666bbe61532 100644
--- a/flink-python/pyflink/table/schema.py
+++ b/flink-python/pyflink/table/schema.py
@@ -18,8 +18,7 @@
from typing import Union, List, TYPE_CHECKING
from pyflink.java_gateway import get_gateway
-from pyflink.table import Expression
-from pyflink.table.expression import _get_java_expression
+from pyflink.table.expression import Expression, _get_java_expression
from pyflink.table.types import DataType, _to_java_data_type
from pyflink.util.java_utils import to_jarray
diff --git a/flink-python/pyflink/table/statement_set.py
b/flink-python/pyflink/table/statement_set.py
index 07ece0a56b1..456c342c958 100644
--- a/flink-python/pyflink/table/statement_set.py
+++ b/flink-python/pyflink/table/statement_set.py
@@ -19,6 +19,7 @@ from typing import Union
from pyflink.java_gateway import get_gateway
from pyflink.table import ExplainDetail
+from pyflink.table.compiled_plan import CompiledPlan
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.table_pipeline import TablePipeline
from pyflink.table.table_result import TableResult
@@ -156,6 +157,15 @@ class StatementSet(object):
j_extra_details = to_j_explain_detail_arr(extra_details)
return self._j_statement_set.explain(TEXT, j_extra_details)
+ def print_explain(self, *extra_details: ExplainDetail):
+ """
+ Like :func:`~pyflink.table.StatementSet.explain`, but prints the
result to the client
+ console.
+
+ .. versionadded:: 2.1.0
+ """
+ print(self.explain(*extra_details))
+
def execute(self) -> TableResult:
"""
execute all statements and Tables as a batch.
@@ -169,3 +179,23 @@ class StatementSet(object):
"""
self._t_env._before_execute()
return TableResult(self._j_statement_set.execute())
+
+ def compile_plan(self) -> CompiledPlan:
+ """
+ Compiles all statements into a :class:`~pyflink.table.CompiledPlan`
that can be executed
+ as one job.
+
+ :class:`~pyflink.table.CompiledPlan`s can be persisted and reloaded
across Flink versions.
+ They describe static pipelines to ensure backwards compatibility and
enable stateful
+ streaming job upgrades. See :class:`~pyflink.table.CompiledPlan` and
the website
+ documentation for more information.
+
+ .. note::
+ The compiled plan feature is experimental in batch mode.
+
+ :raises TableException: if any of the statements is invalid or if the
plan cannot be
+ persisted.
+
+ .. versionadded:: 2.1.0
+ """
+ return
CompiledPlan(j_compiled_plan=self._j_statement_set.compilePlan(),
t_env=self._t_env)
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index 8726d3d52b0..0a4a9cd8489 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1090,6 +1090,15 @@ class Table(object):
j_extra_details = to_j_explain_detail_arr(extra_details)
return self._j_table.explain(TEXT, j_extra_details)
+ def print_explain(self, *extra_details: ExplainDetail):
+ """
+ Like :func:`~pyflink.table.Table.explain`, but prints the result to
the client
+ console.
+
+ .. versionadded:: 2.1.0
+ """
+ print(self.explain(*extra_details))
+
def insert_into(
self, table_path_or_descriptor: Union[str, TableDescriptor],
overwrite: bool = False
) -> TablePipeline:
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 245cf1ea148..61ac1553242 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -34,6 +34,8 @@ from pyflink.serializers import BatchedSerializer,
PickleSerializer
from pyflink.table import Table, EnvironmentSettings, Expression,
ExplainDetail, \
Module, ModuleEntry, Schema, ChangelogMode
from pyflink.table.catalog import Catalog, CatalogDescriptor
+from pyflink.table.compiled_plan import CompiledPlan
+from pyflink.table.plan_reference import PlanReference
from pyflink.table.serializers import ArrowSerializer
from pyflink.table.statement_set import StatementSet
from pyflink.table.table_config import TableConfig
@@ -1409,6 +1411,68 @@ class TableEnvironment(object):
finally:
os.unlink(temp_file.name)
+ def load_plan(self, plan_reference: PlanReference) -> CompiledPlan:
+ """
+ Loads a plan from a :class:`~pyflink.table.PlanReference` into a
+ :class:`~pyflink.table.CompiledPlan`.
+
+ Compiled plans can be persisted and reloaded across Flink versions.
They describe static
+ pipelines to ensure backwards compatibility and enable stateful
streaming job upgrades. See
+ :class:`~pyflink.table.CompiledPlan` and the website documentation for
more information.
+
+ This method will parse the input reference and will validate the plan.
The returned
+ instance can be executed via
:func:`~pyflink.table.CompiledPlan.execute`.
+
+ .. note::
+ The compiled plan feature is experimental in batch mode.
+
+ :raises TableException: if the plan cannot be loaded from the
filesystem, or if the plan
+ is invalid.
+
+ .. versionadded:: 2.1.0
+ """
+ return CompiledPlan(
+
j_compiled_plan=self._j_tenv.loadPlan(plan_reference._j_plan_reference),
+ t_env=self._j_tenv
+ )
+
+ def compile_plan_sql(self, stmt: str) -> CompiledPlan:
+ """
+ Compiles a SQL DML statement into a
:class:`~pyflink.table.CompiledPlan`.
+
+ Compiled plans can be persisted and reloaded across Flink versions.
They describe static
+ pipelines to ensure backwards compatibility and enable stateful
streaming job upgrades. See
+ :class:`~pyflink.table.CompiledPlan` and the website documentation for
more information.
+
+ .. note::
+ Only ``INSERT INTO`` is supported at the moment.
+
+ .. note::
+ The compiled plan feature is experimental in batch mode.
+
+ .. seealso::
+ :func:`~pyflink.table.TableEnvironment.load_plan`
+ :func:`~pyflink.table.CompiledPlan.execute`
+
+ :raises TableException: if the SQL statement is invalid or if the plan
cannot be
+ persisted.
+
+ .. versionadded:: 2.1.0
+ """
+ return CompiledPlan(j_compiled_plan=self._j_tenv.compilePlanSql(stmt),
t_env=self._j_tenv)
+
+ def execute_plan(self, plan_reference: PlanReference) -> TableResult:
+ """
+ Shorthand for ``tEnv.load_plan(plan_reference).execute()``.
+
+ .. seealso::
+ :func:`~pyflink.table.TableEnvironment.load_plan`
+ :func:`~pyflink.table.CompiledPlan.execute`
+
+ .. versionadded:: 2.1.0
+ """
+ return self.load_plan(plan_reference).execute()
+
def _set_python_executable_for_local_executor(self):
jvm = get_gateway().jvm
j_config = get_j_env_configuration(self._get_j_env())
diff --git a/flink-python/pyflink/table/table_pipeline.py
b/flink-python/pyflink/table/table_pipeline.py
index e2c82ac2ab4..64900e0df1e 100644
--- a/flink-python/pyflink/table/table_pipeline.py
+++ b/flink-python/pyflink/table/table_pipeline.py
@@ -20,6 +20,7 @@ from typing import Optional
from pyflink.java_gateway import get_gateway
from pyflink.table import ExplainDetail
from pyflink.table.catalog import ObjectIdentifier
+from pyflink.table.compiled_plan import CompiledPlan
from pyflink.table.table_result import TableResult
from pyflink.util.java_utils import to_j_explain_detail_arr
@@ -63,6 +64,15 @@ class TablePipeline(object):
gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT,
j_extra_details
)
+ def print_explain(self, *extra_details: ExplainDetail):
+ """
+ Like :func:`~pyflink.table.TablePipeline.explain`, but prints the
result to the client
+ console.
+
+ .. versionadded:: 2.1.0
+ """
+ print(self.explain(*extra_details))
+
def get_sink_identifier(self) -> Optional[ObjectIdentifier]:
"""
Returns the sink table's
:class:`~pyflink.table.catalog.ObjectIdentifier`, if any.
@@ -79,3 +89,23 @@ class TablePipeline(object):
if optional_result.isPresent()
else None
)
+
+ def compile_plan(self) -> CompiledPlan:
+ """
+ Compiles this :class:`TablePipeline` into a
:class:`~pyflink.table.CompiledPlan` that can
+ be executed as one job.
+
+ :class:`~pyflink.table.CompiledPlan`s can be persisted and reloaded
across Flink versions.
+ They describe static pipelines to ensure backwards compatibility and
enable stateful
+ streaming job upgrades. See :class:`~pyflink.table.CompiledPlan` and
the website
+ documentation for more information.
+
+ .. note::
+ The compiled plan feature is experimental in batch mode.
+
+ :raises TableException: if any of the statements is invalid or if the
plan cannot be
+ persisted.
+
+ .. versionadded:: 2.1.0
+ """
+ return
CompiledPlan(j_compiled_plan=self._j_table_pipeline.compilePlan(),
t_env=self._t_env)
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
new file mode 100644
index 00000000000..7fcfad4bd7f
--- /dev/null
+++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
@@ -0,0 +1,87 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 0,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "datagen",
+ "number-of-rows" : "5"
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 0,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "blackhole"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[a, b, c])"
+ } ],
+ "edges" : [ {
+ "source" : 0,
+ "target" : 0,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git a/flink-python/pyflink/table/tests/test_compiled_plan.py
b/flink-python/pyflink/table/tests/test_compiled_plan.py
new file mode 100644
index 00000000000..fe0675e5939
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_compiled_plan.py
@@ -0,0 +1,110 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os.path
+import re
+from pathlib import Path
+
+from pyflink.table import Schema, DataTypes, TableDescriptor, PlanReference
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase,
PyFlinkTestCase
+
+THIS_DIR = os.path.dirname(os.path.abspath(__file__))
+JSON_PLAN_DIR = os.path.join(THIS_DIR, "jsonplan")
+
+
+def _replace_flink_version(plan: str) -> str:
+ """
+ Ignore the value for the Flink Version in the compiled plan.
+ """
+ return re.sub(r'"flinkVersion"\s*:\s*"[\w.-]*"', r'"flinkVersion" : ""',
plan)
+
+
+def _replace_exec_node_id(plan: str) -> str:
+ """
+ Ignore id/source/target node ids, as these increment between test runs.
+ """
+ id = re.sub(r'"id"\s*:\s*\d+', r'"id" : 0', plan)
+ source = re.sub(r'"source"\s*:\s*\d+', r'"source" : 0', id)
+ target = re.sub(r'"target"\s*:\s*\d+', r'"target" : 0', source)
+ return target
+
+
+class CompiledPlanTest(PyFlinkStreamTableTestCase, PyFlinkTestCase):
+
+ def test_compile_plan_sql(self):
+ src = """
+ CREATE TABLE MyTable (a BIGINT, b INT, c VARCHAR)
+ WITH ('connector' = 'datagen', 'number-of-rows' = '5')
+ """
+ self.t_env.execute_sql(src)
+
+ sink = """
+ CREATE TABLE MySink (a BIGINT, b INT, c VARCHAR)
+ WITH ('connector' = 'blackhole')
+ """
+ self.t_env.execute_sql(sink)
+
+ compiled_plan = self.t_env.compile_plan_sql("INSERT INTO MySink SELECT
* FROM MyTable")
+ with open(os.path.join(JSON_PLAN_DIR, "testGetJsonPlan.out")) as file:
+ expected = file.read()
+
+ self.maxDiff = None
+ self.assertEqual(
+
_replace_exec_node_id(_replace_flink_version(compiled_plan.as_json_string())),
expected
+ )
+
+ def test_write_load_compiled_plan(self):
+ schema = Schema.new_builder().column("f0", DataTypes.STRING()).build()
+ table = self.t_env.from_descriptor(
+ TableDescriptor.for_connector("datagen")
+ .option("number-of-rows", "10")
+ .schema(schema)
+ .build()
+ )
+ self.t_env.create_temporary_table(
+ "RegisteredSink",
+ TableDescriptor.for_connector("blackhole").schema(schema).build(),
+ )
+ table_pipeline = table.insert_into("RegisteredSink")
+ compiled_plan = table_pipeline.compile_plan()
+
+ plan_path = Path(self.tempdir + "/plan.out")
+ compiled_plan.write_to_file(plan_path)
+
+ plan_reference_from_file = PlanReference.from_file(plan_path)
+ compiled_plan_from_file =
self.t_env.load_plan(plan_reference_from_file)
+ self.assertEqual(compiled_plan.as_json_string(),
compiled_plan_from_file.as_json_string())
+
+ with open(plan_path) as file:
+ content = file.read()
+ plan_reference_from_string =
PlanReference.from_json_string(content)
+ compiled_plan_from_string =
self.t_env.load_plan(plan_reference_from_string)
+ self.assertEqual(
+ compiled_plan.as_json_string(),
compiled_plan_from_string.as_json_string()
+ )
+
+
+if __name__ == "__main__":
+ import unittest
+
+ try:
+ import xmlrunner
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
b/flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
similarity index 70%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
index 8ba2b10a572..4306978ae78 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_compiled_plan_completeness.py
@@ -15,36 +15,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+from pyflink.table import CompiledPlan
from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase,
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
+class CompiledPlanAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
"""
- Tests whether the Python :class:`TablePipeline` is consistent with
- Java `org.apache.flink.table.api.TablePipeline`.
+ Tests whether the Python :class:`CompiledPlan` is consistent with
+ Java `org.apache.flink.table.api.CompiledPlan`.
"""
@classmethod
def python_class(cls):
- return TablePipeline
+ return CompiledPlan
@classmethod
def java_class(cls):
- return "org.apache.flink.table.api.TablePipeline"
-
- @classmethod
- def excluded_methods(cls):
- return {'printExplain', 'compilePlan'}
+ return "org.apache.flink.table.api.CompiledPlan"
-if __name__ == '__main__':
+if __name__ == "__main__":
import unittest
try:
import xmlrunner
- testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
diff --git
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
b/flink-python/pyflink/table/tests/test_plan_reference_completeness.py
similarity index 74%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_plan_reference_completeness.py
index 8ba2b10a572..755e134387c 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_plan_reference_completeness.py
@@ -15,36 +15,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+from pyflink.table import PlanReference
from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase,
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
+class PlanReferenceAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
"""
- Tests whether the Python :class:`TablePipeline` is consistent with
- Java `org.apache.flink.table.api.TablePipeline`.
+ Tests whether the Python :class:`PlanReference` is consistent with
+ Java `org.apache.flink.table.api.PlanReference`.
"""
@classmethod
def python_class(cls):
- return TablePipeline
+ return PlanReference
@classmethod
def java_class(cls):
- return "org.apache.flink.table.api.TablePipeline"
+ return "org.apache.flink.table.api.PlanReference"
@classmethod
def excluded_methods(cls):
- return {'printExplain', 'compilePlan'}
+ return {"fromResource"}
-if __name__ == '__main__':
+if __name__ == "__main__":
import unittest
try:
import xmlrunner
- testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
diff --git
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
b/flink-python/pyflink/table/tests/test_statement_set_completeness.py
similarity index 70%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_statement_set_completeness.py
index 8ba2b10a572..ebc5e39feb9 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_statement_set_completeness.py
@@ -15,36 +15,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+from pyflink.table import StatementSet
from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase,
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
+class StatementSetAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
"""
- Tests whether the Python :class:`TablePipeline` is consistent with
- Java `org.apache.flink.table.api.TablePipeline`.
+ Tests whether the Python :class:`StatementSet` is consistent with
+ Java `org.apache.flink.table.api.StatementSet`.
"""
@classmethod
def python_class(cls):
- return TablePipeline
+ return StatementSet
@classmethod
def java_class(cls):
- return "org.apache.flink.table.api.TablePipeline"
-
- @classmethod
- def excluded_methods(cls):
- return {'printExplain', 'compilePlan'}
+ return "org.apache.flink.table.api.StatementSet"
-if __name__ == '__main__':
+if __name__ == "__main__":
import unittest
try:
import xmlrunner
- testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py
b/flink-python/pyflink/table/tests/test_table_completeness.py
index f01f9116b24..947c4be47ec 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -36,7 +36,7 @@ class
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
@classmethod
def excluded_methods(cls):
- return {'createTemporalTableFunction', 'getQueryOperation',
'printExplain'}
+ return {'createTemporalTableFunction', 'getQueryOperation'}
@classmethod
def java_method_name(cls, python_method_name):
diff --git
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
b/flink-python/pyflink/table/tests/test_table_environment_completeness.py
similarity index 66%
copy from flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
copy to flink-python/pyflink/table/tests/test_table_environment_completeness.py
index 8ba2b10a572..b4fd310631f 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_completeness.py
@@ -15,36 +15,44 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+from pyflink.table import TableEnvironment
from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase,
PyFlinkTestCase
-from pyflink.table.table_pipeline import TablePipeline
-class TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
+class TableEnvironmentAPICompletenessTests(PythonAPICompletenessTestCase,
PyFlinkTestCase):
"""
- Tests whether the Python :class:`TablePipeline` is consistent with
- Java `org.apache.flink.table.api.TablePipeline`.
+ Tests whether the Python :class:`TableEnvironment` is consistent with
+ Java `org.apache.flink.table.api.TableEnvironment`.
"""
@classmethod
def python_class(cls):
- return TablePipeline
+ return TableEnvironment
@classmethod
def java_class(cls):
- return "org.apache.flink.table.api.TablePipeline"
+ return "org.apache.flink.table.api.TableEnvironment"
@classmethod
def excluded_methods(cls):
- return {'printExplain', 'compilePlan'}
-
-
-if __name__ == '__main__':
+ return {
+ "fromValues",
+ "createFunction",
+ "getCompletionHints",
+ "scan",
+ "registerTable",
+ "from",
+ "registerFunction",
+ }
+
+
+if __name__ == "__main__":
import unittest
try:
import xmlrunner
- testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports")
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_table_pipeline.py
b/flink-python/pyflink/table/tests/test_table_pipeline.py
index c7a596ebd81..ed6f4b0f933 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline.py
+++ b/flink-python/pyflink/table/tests/test_table_pipeline.py
@@ -82,6 +82,22 @@ class TablePipelineTest(PyFlinkStreamTableTestCase):
"default_catalog.default_database.RegisteredSinkForExecute",
)
+ def test_table_pipeline_compile(self):
+ schema = Schema.new_builder().column("f0", DataTypes.STRING()).build()
+ table = self.t_env.from_descriptor(
+ TableDescriptor.for_connector("datagen")
+ .option("number-of-rows", "10")
+ .schema(schema)
+ .build()
+ )
+ self.t_env.create_temporary_table(
+ "RegisteredSinkForPlanCompile",
+ TableDescriptor.for_connector("blackhole").schema(schema).build(),
+ )
+ table_pipeline = table.insert_into("RegisteredSinkForPlanCompile")
+ compiled_plan = table_pipeline.compile_plan()
+ self.assertIsNotNone(compiled_plan.as_json_string())
+
if __name__ == "__main__":
import unittest
diff --git
a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
b/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
index 8ba2b10a572..2942d35e771 100644
--- a/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_pipeline_completeness.py
@@ -34,10 +34,6 @@ class
TablePipelineAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTe
def java_class(cls):
return "org.apache.flink.table.api.TablePipeline"
- @classmethod
- def excluded_methods(cls):
- return {'printExplain', 'compilePlan'}
-
if __name__ == '__main__':
import unittest
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
index e9b384d0e5f..643cf958db7 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Compilable.java
@@ -36,7 +36,7 @@ public interface Compilable {
* pipelines to ensure backwards compatibility and enable stateful
streaming job upgrades. See
* {@link CompiledPlan} and the website documentation for more information.
*
- * <p>Note: The compiled plan feature is not supported in batch mode.
+ * <p>Note: The compiled plan feature is experimental in batch mode.
*
* @throws TableException if any of the statements is invalid or if the
plan cannot be
* persisted.
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
index e9295e2d182..a1c90c7f017 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/CompiledPlan.java
@@ -49,6 +49,10 @@ import java.nio.file.Paths;
* objects, only the identifier is part of the plan and the object needs to be
present in the
* session context during a restore.
*
+ * <p>JSON encoding is assumed to be the default representation of a compiled
plan in all API
+ * endpoints, and is the format used to persist the plan to files by default.
For advanced use
+ * cases, {@link #asSmileBytes()} provides a binary format representation of
the compiled plan.
+ *
* <p>Note: Plan restores assume a stable session context. Configuration,
loaded modules and
* catalogs, and temporary objects must not change. Schema evolution and
changes of function
* signatures are not supported.
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
index 9e628b71ca8..d7223485167 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlanReference.java
@@ -168,7 +168,7 @@ public abstract class PlanReference {
}
}
- /** Plan reference to a string containing the serialized persisted plan in
Smile. */
+ /** Plan reference to binary bytes containing the serialized persisted
plan in Smile. */
@PublicEvolving
public static class BytesContentPlanReference extends PlanReference {
diff --git a/pom.xml b/pom.xml
index 4157a46f414..9581bd2664f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1641,6 +1641,7 @@ under the License.
<exclude>flink-runtime-web/web-dashboard/dev/notice-template</exclude>
<exclude>flink-examples/flink-examples-streaming/src/main/resources/datas/**</exclude>
<exclude>flink-examples/flink-examples-streaming/src/test/resources/datas/**</exclude>
+
<exclude>flink-python/pyflink/table/tests/jsonplan/*</exclude>
<!-- ArchUnit violation stores
-->
<exclude>**/archunit-violations/**</exclude>