This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d2710743eb5 [FLINK-30752][python] Support 'EXPLAIN PLAN_ADVICE'
statement in PyFlink
d2710743eb5 is described below
commit d2710743eb55e76e0e8d1fe8a5c0d6d791147c56
Author: Jane Chan <[email protected]>
AuthorDate: Thu Jan 19 14:54:58 2023 +0800
[FLINK-30752][python] Support 'EXPLAIN PLAN_ADVICE' statement in PyFlink
This closes #21727.
---
.../pyflink/examples/table/basic_operations.py | 43 +++++++++++++++++++++-
flink-python/pyflink/table/explain_detail.py | 3 ++
flink-python/pyflink/table/tests/test_explain.py | 10 ++++-
flink-python/pyflink/table/tests/test_sql.py | 9 ++++-
.../table/tests/test_table_environment_api.py | 2 +-
flink-python/pyflink/util/java_utils.py | 4 +-
6 files changed, 64 insertions(+), 7 deletions(-)
diff --git a/flink-python/pyflink/examples/table/basic_operations.py
b/flink-python/pyflink/examples/table/basic_operations.py
index 48e7abe4052..4b13f83b25c 100644
--- a/flink-python/pyflink/examples/table/basic_operations.py
+++ b/flink-python/pyflink/examples/table/basic_operations.py
@@ -20,7 +20,7 @@ import logging
import sys
from pyflink.common import Row
-from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings)
+from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings,
ExplainDetail)
from pyflink.table.expressions import *
from pyflink.table.udf import udtf, udf, udaf, AggregateFunction,
TableAggregateFunction, udtaf
@@ -160,6 +160,25 @@ def basic_operations():
# +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON
ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS
tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS
country])
# +- LegacyTableSourceScan(table=[[default_catalog, default_database,
Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id,
data)]]], fields=[id, data])
+ # show execute plan with advice
+
print(table.join_lateral(split.alias('a')).explain(ExplainDetail.PLAN_ADVICE))
+ # == Abstract Syntax Tree ==
+ # LogicalCorrelate(correlation=[$cor2], joinType=[inner],
requiredColumns=[{}])
+ # :- LogicalProject(id=[$0], name=[JSON_VALUE($1, _UTF-16LE'$.name',
FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], tel=[JSON_VALUE($1,
_UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))],
country=[JSON_VALUE($1, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY),
FLAG(NULL), FLAG(ON ERROR))])
+ # : +- LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
+ # +-
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
$1, $2, $3)], rowType=[RecordType(VARCHAR(2147483647) a)])
+ #
+ # == Optimized Physical Plan With Advice ==
+ #
PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
$1, $2, $3)],
correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))],
select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id,
VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country,
VARCHAR(2147483647) a)], joinType=[INNER])
+ # +- Calc(select=[id, JSON_VALUE(data, _UTF-16LE'$.name', FLAG(NULL),
FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS name, JSON_VALUE(data,
_UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS
tel, JSON_VALUE(data, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY),
FLAG(NULL), FLAG(ON ERROR)) AS country])
+ # +- TableSourceScan(table=[[*anonymous_python-input-format$1*]],
fields=[id, data])
+ #
+ # No available advice...
+ #
+ # == Optimized Execution Plan ==
+ #
PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
$1, $2, $3)],
correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))],
select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id,
VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country,
VARCHAR(2147483647) a)], joinType=[INNER])
+ # +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON
ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS
tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS
country])
+ # +- TableSourceScan(table=[[*anonymous_python-input-format$1*]],
fields=[id, data])
def sql_operations():
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
@@ -228,6 +247,28 @@ def sql_operations():
# PythonCorrelate(invocation=[parse_data($1)],
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2],
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647)
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
# +- LegacyTableSourceScan(table=[[default_catalog, default_database,
Unregistered_TableSource_734856049, source: [PythonInputFormatTableSource(id,
data)]]], fields=[id, data])
+ # explain sql plan with advice
+ print(t_env.explain_sql(
+ """
+ SELECT *
+ FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
+ """ % table, ExplainDetail.PLAN_ADVICE
+ ))
+ # == Abstract Syntax Tree ==
+ # LogicalProject(id=[$0], data=[$1], name=[$2], tel=[$3], country=[$4])
+ # +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{1}])
+ # :- LogicalTableScan(table=[[*anonymous_python-input-format$10*]])
+ # +- LogicalTableFunctionScan(invocation=[parse_data($cor2.data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1,
VARCHAR(2147483647) f2)])
+ #
+ # == Optimized Physical Plan With Advice ==
+ # PythonCorrelate(invocation=[parse_data($1)],
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2],
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647)
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+ # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]],
fields=[id, data])
+ #
+ # No available advice...
+ #
+ # == Optimized Execution Plan ==
+ # PythonCorrelate(invocation=[parse_data($1)],
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2],
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647)
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+ # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]],
fields=[id, data])
def column_operations():
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
diff --git a/flink-python/pyflink/table/explain_detail.py
b/flink-python/pyflink/table/explain_detail.py
index 12cbcdfce74..3b1e417a16d 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -37,3 +37,6 @@ class ExplainDetail(object):
# The execution plan in json format of the program.
JSON_EXECUTION_PLAN = 2
+
+ # The potential risk warnings and SQL optimizer tuning advice analyzed
from the physical plan.
+ PLAN_ADVICE = 3
diff --git a/flink-python/pyflink/table/tests/test_explain.py
b/flink-python/pyflink/table/tests/test_explain.py
index b8265d0059e..b2a6a601369 100644
--- a/flink-python/pyflink/table/tests/test_explain.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -26,9 +26,15 @@ class StreamTableExplainTests(PyFlinkStreamTableTestCase):
def test_explain(self):
t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
result = t.group_by(t.c).select(t.a.sum, t.c.alias('b')).explain(
- ExplainDetail.CHANGELOG_MODE)
-
+ ExplainDetail.CHANGELOG_MODE, ExplainDetail.PLAN_ADVICE)
assert isinstance(result, str)
+ self.assertGreaterEqual(result.find('== Optimized Physical Plan With
Advice =='), 0)
+ self.assertGreaterEqual(result.find('advice[1]: [ADVICE] You might
want to enable '
+ 'local-global two-phase
optimization by configuring ('
+ '\'table.exec.mini-batch.enabled\'
to \'true\', '
+
'\'table.exec.mini-batch.allow-latency\' to a '
+ 'positive long value,
\'table.exec.mini-batch.size\' '
+ 'to a positive long value).'), 0)
result = t.group_by(t.c).select(t.a.sum, t.c.alias('b')).explain(
ExplainDetail.JSON_EXECUTION_PLAN)
diff --git a/flink-python/pyflink/table/tests/test_sql.py
b/flink-python/pyflink/table/tests/test_sql.py
index a6330cd3be6..a17a36d9f3b 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,7 +21,7 @@ import subprocess
from pyflink.find_flink_home import _find_flink_source_root
from pyflink.java_gateway import get_gateway
-from pyflink.table import ResultKind
+from pyflink.table import ResultKind, ExplainDetail
from pyflink.table import expressions as expr
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
@@ -37,7 +37,12 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
.alias("a", "b") \
.select(expr.call("func1", expr.col("a"), expr.col("b")))
plan = table.explain()
- self.assertTrue(plan.find("PythonCalc(select=[func1(f0, f1) AS _c0])")
>= 0)
+ self.assertGreaterEqual(plan.find("== Optimized Physical Plan =="), 0)
+ self.assertGreaterEqual(plan.find("PythonCalc(select=[func1(f0, f1) AS
_c0])"), 0)
+
+ plan = table.explain(ExplainDetail.PLAN_ADVICE)
+ self.assertGreaterEqual(plan.find("== Optimized Physical Plan With
Advice =="), 0)
+ self.assertGreaterEqual(plan.find("No available advice..."), 0)
def test_sql_query(self):
t_env = self.t_env
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 287dc268f83..8162651eb58 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -77,7 +77,7 @@ class TableEnvironmentTest(PyFlinkUTTestCase):
result = t.select(t.a + 1, t.b, t.c)
actual = result.explain(ExplainDetail.ESTIMATED_COST,
ExplainDetail.CHANGELOG_MODE,
- ExplainDetail.JSON_EXECUTION_PLAN)
+ ExplainDetail.JSON_EXECUTION_PLAN,
ExplainDetail.PLAN_ADVICE)
assert isinstance(actual, str)
diff --git a/flink-python/pyflink/util/java_utils.py
b/flink-python/pyflink/util/java_utils.py
index 1b9685c7d7c..b4d5b3c7a4c 100644
--- a/flink-python/pyflink/util/java_utils.py
+++ b/flink-python/pyflink/util/java_utils.py
@@ -184,8 +184,10 @@ def to_j_explain_detail_arr(p_extra_details):
return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
- else:
+ elif p_extra_detail == ExplainDetail.ESTIMATED_COST:
return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+ else:
+ return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.PLAN_ADVICE
_len = len(p_extra_details) if p_extra_details else 0
j_arr =
gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)