This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 79019bc  [FLINK-23280][python] Fix the issue that Python 
ExplainDetails does not have JSON_EXECUTION_PLAN option
79019bc is described below

commit 79019bc1196fb65a04f7533d6abc3776a4dcb652
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jul 7 13:10:08 2021 +0800

    [FLINK-23280][python] Fix the issue that Python ExplainDetails does not 
have JSON_EXECUTION_PLAN option
    
    This closes #16407.
---
 flink-python/pyflink/table/explain_detail.py                   | 3 +++
 flink-python/pyflink/table/tests/test_explain.py               | 9 +++++++++
 flink-python/pyflink/table/tests/test_table_environment_api.py | 6 ++++--
 flink-python/pyflink/util/java_utils.py                        | 4 +++-
 4 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/flink-python/pyflink/table/explain_detail.py 
b/flink-python/pyflink/table/explain_detail.py
index b3b549f..12cbcdf 100644
--- a/flink-python/pyflink/table/explain_detail.py
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -34,3 +34,6 @@ class ExplainDetail(object):
     # The changelog mode produced by a physical rel node.
     # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
     CHANGELOG_MODE = 1
+
+    # The execution plan in json format of the program.
+    JSON_EXECUTION_PLAN = 2
diff --git a/flink-python/pyflink/table/tests/test_explain.py 
b/flink-python/pyflink/table/tests/test_explain.py
index 06a7ce2..8fad6c1 100644
--- a/flink-python/pyflink/table/tests/test_explain.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import json
 
 from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase
 from pyflink.table.explain_detail import ExplainDetail
@@ -29,6 +30,14 @@ class 
StreamTableExplainTests(PyFlinkBlinkStreamTableTestCase):
 
         assert isinstance(result, str)
 
+        result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain(
+            ExplainDetail.JSON_EXECUTION_PLAN)
+        assert isinstance(result, str)
+        try:
+            json.loads(result.split('== Physical Execution Plan ==')[1])
+        except:
+            self.fail('The execution plan of explain detail is not in json 
format.')
+
 
 if __name__ == '__main__':
     import unittest
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3774178..86be819 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -77,7 +77,8 @@ class TableEnvironmentTest(object):
         t = t_env.from_elements([], schema)
         result = t.select(t.a + 1, t.b, t.c)
 
-        actual = result.explain(ExplainDetail.ESTIMATED_COST, 
ExplainDetail.CHANGELOG_MODE)
+        actual = result.explain(ExplainDetail.ESTIMATED_COST, 
ExplainDetail.CHANGELOG_MODE,
+                                ExplainDetail.JSON_EXECUTION_PLAN)
 
         assert isinstance(actual, str)
 
@@ -970,7 +971,8 @@ class 
BlinkBatchTableEnvironmentTests(PyFlinkBlinkBatchTableTestCase):
         stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 
100" % source)
         stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 
100" % source)
 
-        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, 
ExplainDetail.CHANGELOG_MODE)
+        actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, 
ExplainDetail.CHANGELOG_MODE,
+                                  ExplainDetail.JSON_EXECUTION_PLAN)
         self.assertIsInstance(actual, str)
 
     def test_register_java_function(self):
diff --git a/flink-python/pyflink/util/java_utils.py 
b/flink-python/pyflink/util/java_utils.py
index 8ea8d9b..df9c2c9 100644
--- a/flink-python/pyflink/util/java_utils.py
+++ b/flink-python/pyflink/util/java_utils.py
@@ -143,7 +143,9 @@ def to_j_explain_detail_arr(p_extra_details):
     gateway = get_gateway()
 
     def to_j_explain_detail(p_extra_detail):
-        if p_extra_detail == ExplainDetail.CHANGELOG_MODE:
+        if p_extra_detail == ExplainDetail.JSON_EXECUTION_PLAN:
+            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.JSON_EXECUTION_PLAN
+        elif p_extra_detail == ExplainDetail.CHANGELOG_MODE:
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE
         else:
             return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST

Reply via email to