This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d2710743eb5 [FLINK-30752][python] Support 'EXPLAIN PLAN_ADVICE' 
statement in PyFlink
d2710743eb5 is described below

commit d2710743eb55e76e0e8d1fe8a5c0d6d791147c56
Author: Jane Chan <[email protected]>
AuthorDate: Thu Jan 19 14:54:58 2023 +0800

    [FLINK-30752][python] Support 'EXPLAIN PLAN_ADVICE' statement in PyFlink
    
    This closes #21727.
---
 .../pyflink/examples/table/basic_operations.py     | 43 +++++++++++++++++++++-
 flink-python/pyflink/table/explain_detail.py       |  3 ++
 flink-python/pyflink/table/tests/test_explain.py   | 10 ++++-
 flink-python/pyflink/table/tests/test_sql.py       |  9 ++++-
 .../table/tests/test_table_environment_api.py      |  2 +-
 flink-python/pyflink/util/java_utils.py            |  4 +-
 6 files changed, 64 insertions(+), 7 deletions(-)

diff --git a/flink-python/pyflink/examples/table/basic_operations.py 
b/flink-python/pyflink/examples/table/basic_operations.py
index 48e7abe4052..4b13f83b25c 100644
--- a/flink-python/pyflink/examples/table/basic_operations.py
+++ b/flink-python/pyflink/examples/table/basic_operations.py
@@ -20,7 +20,7 @@ import logging
 import sys
 
 from pyflink.common import Row
-from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings)
+from pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings, 
ExplainDetail)
 from pyflink.table.expressions import *
 from pyflink.table.udf import udtf, udf, udaf, AggregateFunction, 
TableAggregateFunction, udtaf
 
@@ -160,6 +160,25 @@ def basic_operations():
     # +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON 
ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS 
tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS 
country])
     #    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, 
data)]]], fields=[id, data])
 
+    # show execute plan with advice
+    
print(table.join_lateral(split.alias('a')).explain(ExplainDetail.PLAN_ADVICE))
+    # == Abstract Syntax Tree ==
+    # LogicalCorrelate(correlation=[$cor2], joinType=[inner], 
requiredColumns=[{}])
+    # :- LogicalProject(id=[$0], name=[JSON_VALUE($1, _UTF-16LE'$.name', 
FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], tel=[JSON_VALUE($1, 
_UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], 
country=[JSON_VALUE($1, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), 
FLAG(NULL), FLAG(ON ERROR))])
+    # :  +- LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
+    # +- 
LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
 $1, $2, $3)], rowType=[RecordType(VARCHAR(2147483647) a)])
+    #
+    # == Optimized Physical Plan With Advice ==
+    # 
PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
 $1, $2, $3)], 
correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))],
 select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, 
VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, 
VARCHAR(2147483647) a)], joinType=[INNER])
+    # +- Calc(select=[id, JSON_VALUE(data, _UTF-16LE'$.name', FLAG(NULL), 
FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS name, JSON_VALUE(data, 
_UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS 
tel, JSON_VALUE(data, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), 
FLAG(NULL), FLAG(ON ERROR)) AS country])
+    #    +- TableSourceScan(table=[[*anonymous_python-input-format$1*]], 
fields=[id, data])
+    #
+    # No available advice...
+    #
+    # == Optimized Execution Plan ==
+    # 
PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*($0,
 $1, $2, $3)], 
correlate=[table(*org.apache.flink.table.functions.python.PythonTableFunction$720258394f6a31d31376164d23142f53*(id,name,tel,country))],
 select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, 
VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, 
VARCHAR(2147483647) a)], joinType=[INNER])
+    # +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON 
ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS 
tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS 
country])
+    #    +- TableSourceScan(table=[[*anonymous_python-input-format$1*]], 
fields=[id, data])
 
 def sql_operations():
     t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
@@ -228,6 +247,28 @@ def sql_operations():
     # PythonCorrelate(invocation=[parse_data($1)], 
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], 
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) 
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
     # +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
Unregistered_TableSource_734856049, source: [PythonInputFormatTableSource(id, 
data)]]], fields=[id, data])
 
+    # explain sql plan with advice
+    print(t_env.explain_sql(
+        """
+        SELECT *
+        FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
+        """ % table, ExplainDetail.PLAN_ADVICE
+    ))
+    # == Abstract Syntax Tree ==
+    # LogicalProject(id=[$0], data=[$1], name=[$2], tel=[$3], country=[$4])
+    # +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{1}])
+    #    :- LogicalTableScan(table=[[*anonymous_python-input-format$10*]])
+    #    +- LogicalTableFunctionScan(invocation=[parse_data($cor2.data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1, 
VARCHAR(2147483647) f2)])
+    #
+    # == Optimized Physical Plan With Advice ==
+    # PythonCorrelate(invocation=[parse_data($1)], 
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], 
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) 
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+    # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]], 
fields=[id, data])
+    #
+    # No available advice...
+    #
+    # == Optimized Execution Plan ==
+    # PythonCorrelate(invocation=[parse_data($1)], 
correlate=[table(parse_data(data))], select=[id,data,f0,f1,f2], 
rowType=[RecordType(BIGINT id, VARCHAR(2147483647) data, VARCHAR(2147483647) 
f0, INTEGER f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+    # +- TableSourceScan(table=[[*anonymous_python-input-format$10*]], 
fields=[id, data])
 
 def column_operations():
     t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
diff --git a/flink-python/pyflink/table/explain_detail.py 
b/flink-python/pyflink/table/explain_detail.py
index 12cbcdfce74..3b1e417a16d 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -37,3 +37,6 @@ class ExplainDetail(object):
 
     # The execution plan in json format of the program.
     JSON_EXECUTION_PLAN = 2
+
+    # The potential risk warnings and SQL optimizer tuning advice analyzed 
from the physical plan.
+    PLAN_ADVICE = 3
diff --git a/flink-python/pyflink/table/tests/test_explain.py 
b/flink-python/pyflink/table/tests/test_explain.py
index b8265d0059e..b2a6a601369 100644
--- a/flink-python/pyflink/table/tests/test_explain.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -26,9 +26,15 @@ class StreamTableExplainTests(PyFlinkStreamTableTestCase):
     def test_explain(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
         result = t.group_by(t.c).select(t.a.sum, t.c.alias('b')).explain(
-            ExplainDetail.CHANGELOG_MODE)
-
+            ExplainDetail.CHANGELOG_MODE, ExplainDetail.PLAN_ADVICE)
         assert isinstance(result, str)
+        self.assertGreaterEqual(result.find('== Optimized Physical Plan With 
Advice =='), 0)
+        self.assertGreaterEqual(result.find('advice[1]: [ADVICE] You might 
want to enable '
+                                            'local-global two-phase 
optimization by configuring ('
+                                            '\'table.exec.mini-batch.enabled\' 
to \'true\', '
+                                            
'\'table.exec.mini-batch.allow-latency\' to a '
+                                            'positive long value, 
\'table.exec.mini-batch.size\' '
+                                            'to a positive long value).'), 0)
 
         result = t.group_by(t.c).select(t.a.sum, t.c.alias('b')).explain(
             ExplainDetail.JSON_EXECUTION_PLAN)
diff --git a/flink-python/pyflink/table/tests/test_sql.py 
b/flink-python/pyflink/table/tests/test_sql.py
index a6330cd3be6..a17a36d9f3b 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -21,7 +21,7 @@ import subprocess
 
 from pyflink.find_flink_home import _find_flink_source_root
 from pyflink.java_gateway import get_gateway
-from pyflink.table import ResultKind
+from pyflink.table import ResultKind, ExplainDetail
 from pyflink.table import expressions as expr
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
@@ -37,7 +37,12 @@ class StreamSqlTests(PyFlinkStreamTableTestCase):
             .alias("a", "b") \
             .select(expr.call("func1", expr.col("a"), expr.col("b")))
         plan = table.explain()
-        self.assertTrue(plan.find("PythonCalc(select=[func1(f0, f1) AS _c0])") 
>= 0)
+        self.assertGreaterEqual(plan.find("== Optimized Physical Plan =="), 0)
+        self.assertGreaterEqual(plan.find("PythonCalc(select=[func1(f0, f1) AS 
_c0])"), 0)
+
+        plan = table.explain(ExplainDetail.PLAN_ADVICE)
+        self.assertGreaterEqual(plan.find("== Optimized Physical Plan With 
Advice =="), 0)
+        self.assertGreaterEqual(plan.find("No available advice..."), 0)
 
     def test_sql_query(self):
         t_env = self.t_env
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 287dc268f83..8162651eb58 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -77,7 +77,7 @@ class TableEnvironmentTest(PyFlinkUTTestCase):
         result = t.select(t.a + 1, t.b, t.c)
 
         actual = result.explain(ExplainDetail.ESTIMATED_COST, 
ExplainDetail.CHANGELOG_MODE,
-                                ExplainDetail.JSON_EXECUTION_PLAN)
+                                ExplainDetail.JSON_EXECUTION_PLAN, 
ExplainDetail.PLAN_ADVICE)
 
         assert isinstance(actual, str)
 
diff --git a/flink-python/pyflink/util/java_utils.py 
b/flink-python/pyflink/util/java_utils.py
index 1b9685c7d7c..b4d5b3c7a4c 100644
--- a/flink-python/pyflink/util/java_utils.py
+++ b/flink-python/pyflink/util/java_utils.py
@@ -184,8 +184,10 @@ def to_j_explain_detail_arr(p_extra_details):
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
         elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
-        else:
+        elif p_extra_detail == ExplainDetail.ESTIMATED_COST:
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+        else:
+            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.PLAN_ADVICE
 
     _len = len(p_extra_details) if p_extra_details else 0
     j_arr = 
gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)

Reply via email to