This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 79019bc [FLINK-23280][python] Fix the issue that Python
ExplainDetails does not have JSON_EXECUTION_PLAN option
79019bc is described below
commit 79019bc1196fb65a04f7533d6abc3776a4dcb652
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jul 7 13:10:08 2021 +0800
[FLINK-23280][python] Fix the issue that Python ExplainDetails does not
have JSON_EXECUTION_PLAN option
This closes #16407.
---
flink-python/pyflink/table/explain_detail.py | 3 +++
flink-python/pyflink/table/tests/test_explain.py | 9 +++++++++
flink-python/pyflink/table/tests/test_table_environment_api.py | 6 ++++--
flink-python/pyflink/util/java_utils.py | 4 +++-
4 files changed, 19 insertions(+), 3 deletions(-)
diff --git a/flink-python/pyflink/table/explain_detail.py
b/flink-python/pyflink/table/explain_detail.py
index b3b549f..12cbcdf 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -34,3 +34,6 @@ class ExplainDetail(object):
# The changelog mode produced by a physical rel node.
# e.g. GroupAggregate(..., changelogMode=[I,UA,D])
CHANGELOG_MODE = 1
+
+ # The execution plan in json format of the program.
+ JSON_EXECUTION_PLAN = 2
diff --git a/flink-python/pyflink/table/tests/test_explain.py
b/flink-python/pyflink/table/tests/test_explain.py
index 06a7ce2..8fad6c1 100644
--- a/flink-python/pyflink/table/tests/test_explain.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import json
from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase
from pyflink.table.explain_detail import ExplainDetail
@@ -29,6 +30,14 @@ class
StreamTableExplainTests(PyFlinkBlinkStreamTableTestCase):
assert isinstance(result, str)
+ result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain(
+ ExplainDetail.JSON_EXECUTION_PLAN)
+ assert isinstance(result, str)
+ try:
+ json.loads(result.split('== Physical Execution Plan ==')[1])
+ except:
+ self.fail('The execution plan of explain detail is not in json
format.')
+
if __name__ == '__main__':
import unittest
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3774178..86be819 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -77,7 +77,8 @@ class TableEnvironmentTest(object):
t = t_env.from_elements([], schema)
result = t.select(t.a + 1, t.b, t.c)
- actual = result.explain(ExplainDetail.ESTIMATED_COST,
ExplainDetail.CHANGELOG_MODE)
+ actual = result.explain(ExplainDetail.ESTIMATED_COST,
ExplainDetail.CHANGELOG_MODE,
+ ExplainDetail.JSON_EXECUTION_PLAN)
assert isinstance(actual, str)
@@ -970,7 +971,8 @@ class
BlinkBatchTableEnvironmentTests(PyFlinkBlinkBatchTableTestCase):
stmt_set.add_insert_sql("insert into sink1 select * from %s where a >
100" % source)
stmt_set.add_insert_sql("insert into sink2 select * from %s where a <
100" % source)
- actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST,
ExplainDetail.CHANGELOG_MODE)
+ actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST,
ExplainDetail.CHANGELOG_MODE,
+ ExplainDetail.JSON_EXECUTION_PLAN)
self.assertIsInstance(actual, str)
def test_register_java_function(self):
diff --git a/flink-python/pyflink/util/java_utils.py
b/flink-python/pyflink/util/java_utils.py
index 8ea8d9b..df9c2c9 100644
--- a/flink-python/pyflink/util/java_utils.py
+++ b/flink-python/pyflink/util/java_utils.py
@@ -143,7 +143,9 @@ def to_j_explain_detail_arr(p_extra_details):
gateway = get_gateway()
def to_j_explain_detail(p_extra_detail):
- if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
+ if p_extra_detail == ExplainDetail.JSON_EXECUTION_PLAN:
+ return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
+ elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
else:
return
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST