This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d7e64a  [hotfix][python] Aligns with Java Table API by removing 
methods exec_env and query_config
2d7e64a is described below

commit 2d7e64aa481ad03069ecdbdffbc8a22254e94d72
Author: Dian Fu <[email protected]>
AuthorDate: Thu Jun 27 15:47:50 2019 +0800

    [hotfix][python] Aligns with Java Table API by removing methods exec_env 
and query_config
    
    This closes #8910
---
 flink-python/dev/pip_test_code.py                  |  6 +--
 .../dataset/tests/test_execution_environment.py    |  2 +-
 .../tests/test_stream_execution_environment.py     |  2 +-
 flink-python/pyflink/shell.py                      | 37 +++++++++-------
 .../pyflink/table/examples/batch/word_count.py     | 10 +++--
 flink-python/pyflink/table/table.py                |  2 +-
 flink-python/pyflink/table/table_environment.py    | 51 +---------------------
 flink-python/pyflink/table/tests/test_calc.py      |  2 +-
 .../pyflink/table/tests/test_descriptor.py         |  4 +-
 .../pyflink/table/tests/test_shell_example.py      | 12 ++---
 .../table/tests/test_table_environment_api.py      | 17 ++++----
 flink-python/tox.ini                               |  2 +-
 12 files changed, 53 insertions(+), 94 deletions(-)

diff --git a/flink-python/dev/pip_test_code.py 
b/flink-python/dev/pip_test_code.py
index c9a4798..29f9841 100644
--- a/flink-python/dev/pip_test_code.py
+++ b/flink-python/dev/pip_test_code.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 # test pyflink shell environment
-from pyflink.shell import bt_env, FileSystem, OldCsv, DataTypes, Schema
+from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
 
 import tempfile
 import os
@@ -28,7 +28,7 @@ if os.path.exists(sink_path):
         os.remove(sink_path)
     else:
         shutil.rmtree(sink_path)
-bt_env.exec_env().set_parallelism(1)
+b_env.set_parallelism(1)
 t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 
'c'])
 bt_env.connect(FileSystem().path(sink_path)) \
     .with_format(OldCsv()
@@ -44,7 +44,7 @@ bt_env.connect(FileSystem().path(sink_path)) \
 
 t.select("a + 1, b, c").insert_into("batch_sink")
 
-bt_env.exec_env().execute()
+b_env.execute()
 
 with open(sink_path, 'r') as f:
     lines = f.read()
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py 
b/flink-python/pyflink/dataset/tests/test_execution_environment.py
index 7adac00..9dbed96 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment.py
@@ -110,6 +110,6 @@ class ExecutionEnvironmentTests(PyFlinkTestCase):
             CsvTableSink(field_names, field_types, tmp_csv))
         t_env.scan("Orders").insert_into("Results")
 
-        plan = t_env.exec_env().get_execution_plan()
+        plan = self.env.get_execution_plan()
 
         json.loads(plan)
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 43768fa..b02f660 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -187,6 +187,6 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
             CsvTableSink(field_names, field_types, tmp_csv))
         t_env.scan("Orders").insert_into("Results")
 
-        plan = t_env.exec_env().get_execution_plan()
+        plan = self.env.get_execution_plan()
 
         json.loads(plan)
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 2ea3a6b..97b7065 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -20,8 +20,9 @@ import codecs
 import platform
 import sys
 
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.common import *
+from pyflink.dataset import *
+from pyflink.datastream import *
 from pyflink.table import *
 from pyflink.table.catalog import *
 from pyflink.table.descriptors import *
@@ -80,7 +81,7 @@ welcome_msg = u'''
 
 NOTE: Use the prebound Table Environment to implement batch or streaming Table 
programs.
 
-  Batch - Use the 'bt_env' variable
+  Batch - Use 'b_env' and 'bt_env' variables
 
     *
     * import tempfile
@@ -92,25 +93,25 @@ NOTE: Use the prebound Table Environment to implement batch 
or streaming Table p
     *         os.remove(sink_path)
     *     else:
     *         shutil.rmtree(sink_path)
-    * bt_env.exec_env().set_parallelism(1)
+    * b_env.set_parallelism(1)
     * t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
-    * bt_env.connect(FileSystem().path(sink_path))\
+    * bt_env.connect(FileSystem().path(sink_path)) \\
     *     .with_format(OldCsv()
     *                  .field_delimiter(',')
     *                  .field("a", DataTypes.BIGINT())
     *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING()))\
+    *                  .field("c", DataTypes.STRING())) \\
     *     .with_schema(Schema()
     *                  .field("a", DataTypes.BIGINT())
     *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING()))\
+    *                  .field("c", DataTypes.STRING())) \\
     *     .register_table_sink("batch_sink")
     *
     * t.select("a + 1, b, c").insert_into("batch_sink")
     *
-    * bt_env.exec_env().execute()
+    * b_env.execute()
 
-  Streaming - Use the 'st_env' variable
+  Streaming - Use 's_env' and 'st_env' variables
 
     *
     * import tempfile
@@ -122,26 +123,30 @@ NOTE: Use the prebound Table Environment to implement 
batch or streaming Table p
     *         os.remove(sink_path)
     *     else:
     *         shutil.rmtree(sink_path)
-    * st_env.exec_env().set_parallelism(1)
+    * s_env.set_parallelism(1)
     * t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 
'b', 'c'])
-    * st_env.connect(FileSystem().path(sink_path))\\
+    * st_env.connect(FileSystem().path(sink_path)) \\
     *     .with_format(OldCsv()
     *                  .field_delimiter(',')
     *                  .field("a", DataTypes.BIGINT())
     *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING()))\\
+    *                  .field("c", DataTypes.STRING())) \\
     *     .with_schema(Schema()
     *                  .field("a", DataTypes.BIGINT())
     *                  .field("b", DataTypes.STRING())
-    *                  .field("c", DataTypes.STRING()))\\
+    *                  .field("c", DataTypes.STRING())) \\
     *     .register_table_sink("stream_sink")
     * 
     * t.select("a + 1, b, c").insert_into("stream_sink")
     *
-    * st_env.exec_env().execute()
+    * s_env.execute()
 '''
 utf8_out.write(welcome_msg)
 
-bt_env = 
BatchTableEnvironment.create(ExecutionEnvironment.get_execution_environment())
+b_env = ExecutionEnvironment.get_execution_environment()
 
-st_env = 
StreamTableEnvironment.create(StreamExecutionEnvironment.get_execution_environment())
+bt_env = BatchTableEnvironment.create(b_env)
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+
+st_env = StreamTableEnvironment.create(s_env)
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py 
b/flink-python/pyflink/table/examples/batch/word_count.py
index 721e002..c3bc1e2 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -21,7 +21,8 @@ import shutil
 import sys
 import tempfile
 
-from pyflink.table import TableConfig, TableEnvironment
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment, TableConfig
 from pyflink.table.descriptors import FileSystem, OldCsv, Schema
 from pyflink.table.types import DataTypes
 
@@ -35,8 +36,9 @@ def word_count():
               "License you may not use this file except in compliance " \
               "with the License"
 
-    t_config = TableConfig.Builder().as_batch_execution().build()
-    t_env = TableEnvironment.create(t_config)
+    t_config = TableConfig()
+    env = ExecutionEnvironment.get_execution_environment()
+    t_env = BatchTableEnvironment.create(env, t_config)
 
     # register Results table in table environment
     tmp_dir = tempfile.gettempdir()
@@ -68,7 +70,7 @@ def word_count():
          .select("word, count(1) as count") \
          .insert_into("Results")
 
-    t_env.exec_env().execute()
+    env.execute()
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index a02daf2..508ea17 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -52,7 +52,7 @@ class Table(object):
         ...
         >>> t_env.register_table_sink("result", ...)
         >>> t.insert_into("result")
-        >>> t_env.exec_env().execute()
+        >>> env.execute()
 
     Operations such as :func:`~pyflink.table.Table.join`, 
:func:`~pyflink.table.Table.select`,
     :func:`~pyflink.table.Table.where` and 
:func:`~pyflink.table.Table.group_by`
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index bfb7f32..9c4eabe 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -19,11 +19,9 @@ import os
 import tempfile
 from abc import ABCMeta, abstractmethod
 
-from pyflink.dataset import ExecutionEnvironment
 from pyflink.serializers import BatchedSerializer, PickleSerializer
 from pyflink.table.catalog import Catalog
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig, 
QueryConfig
+from pyflink.table.query_config import QueryConfig
 from pyflink.table.table_config import TableConfig
 from pyflink.table.descriptors import (StreamTableDescriptor, 
ConnectorDescriptor,
                                        BatchTableDescriptor)
@@ -383,13 +381,6 @@ class TableEnvironment(object):
         self._j_tenv.useDatabase(database_name)
 
     @abstractmethod
-    def exec_env(self):
-        """
-        :return: The execution environment of this table environment.
-        """
-        pass
-
-    @abstractmethod
     def get_config(self):
         """
         Returns the table config to define the runtime behavior of the Table 
API.
@@ -399,16 +390,6 @@ class TableEnvironment(object):
         pass
 
     @abstractmethod
-    def query_config(self):
-        """
-        Returns a :class:`StreamQueryConfig` that holds parameters to 
configure the behavior of
-        streaming queries.
-
-        :return: A new :class:`StreamQueryConfig` or :class:`BatchQueryConfig`.
-        """
-        pass
-
-    @abstractmethod
     def connect(self, connector_descriptor):
         """
         Creates a table source and/or table sink from a descriptor.
@@ -548,15 +529,6 @@ class StreamTableEnvironment(TableEnvironment):
         table_config._j_table_config = self._j_tenv.getConfig()
         return table_config
 
-    def query_config(self):
-        """
-        Returns a :class:`StreamQueryConfig` that holds parameters to 
configure the behavior of
-        streaming queries.
-
-        :return: A new :class:`StreamQueryConfig`.
-        """
-        return StreamQueryConfig()
-
     def connect(self, connector_descriptor):
         """
         Creates a table source and/or table sink from a descriptor.
@@ -588,12 +560,6 @@ class StreamTableEnvironment(TableEnvironment):
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    def exec_env(self):
-        """
-        :return: The stream execution environment of this table environment.
-        """
-        return StreamExecutionEnvironment(self._j_tenv.execEnv())
-
     @classmethod
     def create(cls, stream_execution_environment, table_config=None):
         gateway = get_gateway()
@@ -630,15 +596,6 @@ class BatchTableEnvironment(TableEnvironment):
         table_config._j_table_config = self._j_tenv.getConfig()
         return table_config
 
-    def query_config(self):
-        """
-        Returns the :class:`BatchQueryConfig` that holds parameters to 
configure the behavior of
-        batch queries.
-
-        :return: A new :class:`BatchQueryConfig`.
-        """
-        return BatchQueryConfig()
-
     def connect(self, connector_descriptor):
         """
         Creates a table source and/or table sink from a descriptor.
@@ -670,12 +627,6 @@ class BatchTableEnvironment(TableEnvironment):
         return BatchTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    def exec_env(self):
-        """
-        :return: The stream execution environment of this table environment.
-        """
-        return ExecutionEnvironment(self._j_tenv.execEnv())
-
     @classmethod
     def create(cls, execution_environment, table_config=None):
         gateway = get_gateway()
diff --git a/flink-python/pyflink/table/tests/test_calc.py 
b/flink-python/pyflink/table/tests/test_calc.py
index 3e13450..7df6e68 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -97,7 +97,7 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
               PythonOnlyPoint(3.0, 4.0))],
             schema)
         t.insert_into("Results")
-        t_env.exec_env().execute()
+        self.env.execute()
         actual = source_sink_utils.results()
 
         expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py 
b/flink-python/pyflink/table/tests/test_descriptor.py
index 7ea3d8b..1f313a7 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -956,7 +956,7 @@ class AbstractTableDescriptorTests(object):
         t_env.scan("source") \
              .select("a + 1, b, c") \
              .insert_into("sink")
-        t_env.exec_env().execute()
+        self.env.execute()
 
         with open(sink_path, 'r') as f:
             lines = f.read()
@@ -998,7 +998,7 @@ class AbstractTableDescriptorTests(object):
         t_env.scan("source") \
              .select("a + 1, b, c") \
              .insert_into("sink")
-        t_env.exec_env().execute()
+        self.env.execute()
 
         with open(sink_path, 'r') as f:
             lines = f.read()
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py 
b/flink-python/pyflink/table/tests/test_shell_example.py
index 10c2eac..97bce08 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -24,7 +24,7 @@ class ShellExampleTests(PyFlinkTestCase):
     """
 
     def test_batch_case(self):
-        from pyflink.shell import bt_env, FileSystem, OldCsv, DataTypes, Schema
+        from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, 
DataTypes, Schema
         # example begin
 
         import tempfile
@@ -36,7 +36,7 @@ class ShellExampleTests(PyFlinkTestCase):
                 os.remove(sink_path)
             else:
                 shutil.rmtree(sink_path)
-        bt_env.exec_env().set_parallelism(1)
+        b_env.set_parallelism(1)
         t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], 
['a', 'b', 'c'])
         bt_env.connect(FileSystem().path(sink_path))\
             .with_format(OldCsv()
@@ -52,7 +52,7 @@ class ShellExampleTests(PyFlinkTestCase):
 
         t.select("a + 1, b, c").insert_into("batch_sink")
 
-        bt_env.exec_env().execute()
+        b_env.execute()
 
         # verify code, do not copy these code to shell.py
         with open(sink_path, 'r') as f:
@@ -60,7 +60,7 @@ class ShellExampleTests(PyFlinkTestCase):
             self.assertEqual(lines, '2,hi,hello\n' + '3,hi,hello\n')
 
     def test_stream_case(self):
-        from pyflink.shell import st_env, FileSystem, OldCsv, DataTypes, Schema
+        from pyflink.shell import s_env, st_env, FileSystem, OldCsv, 
DataTypes, Schema
         # example begin
 
         import tempfile
@@ -72,7 +72,7 @@ class ShellExampleTests(PyFlinkTestCase):
                 os.remove(sink_path)
             else:
                 shutil.rmtree(sink_path)
-        st_env.exec_env().set_parallelism(1)
+        s_env.set_parallelism(1)
         t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], 
['a', 'b', 'c'])
         st_env.connect(FileSystem().path(sink_path))\
             .with_format(OldCsv()
@@ -88,7 +88,7 @@ class ShellExampleTests(PyFlinkTestCase):
 
         t.select("a + 1, b, c").insert_into("stream_sink")
 
-        st_env.exec_env().execute()
+        s_env.execute()
 
         # verify code, do not copy these code to shell.py
         with open(sink_path, 'r') as f:
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 64080f1..a129d22 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,6 +22,7 @@ from py4j.compat import unicode
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamQueryConfig
 from pyflink.table.table_environment import BatchTableEnvironment, 
StreamTableEnvironment
 from pyflink.table.table_config import TableConfig
 from pyflink.table.types import DataTypes, RowType
@@ -53,7 +54,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", 
"c"]).insert_into("Sinks")
-        t_env.exec_env().execute()
+        self.env.execute()
         actual = source_sink_utils.results()
 
         expected = ['1,Hi,Hello']
@@ -114,7 +115,7 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
 
         result = t_env.sql_query("select a + 1, b, c from %s" % source)
         result.insert_into("sinks")
-        t_env.exec_env().execute()
+        self.env.execute()
         actual = source_sink_utils.results()
 
         expected = ['2,Hi,Hello', '3,Hello,Hello']
@@ -130,7 +131,7 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
             source_sink_utils.TestAppendSink(field_names, field_types))
 
         t_env.sql_update("insert into sinks select * from %s" % source)
-        t_env.exec_env().execute("test_sql_job")
+        self.env.execute("test_sql_job")
 
         actual = source_sink_utils.results()
         expected = ['1,Hi,Hello', '2,Hello,Hello']
@@ -144,25 +145,25 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         t_env.register_table_sink(
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
-        query_config = t_env.query_config()
+        query_config = StreamQueryConfig()
         query_config.with_idle_state_retention_time(
             datetime.timedelta(days=1), datetime.timedelta(days=2))
 
         t_env.sql_update("insert into sinks select * from %s" % source, 
query_config)
-        t_env.exec_env().execute("test_sql_job")
+        self.env.execute("test_sql_job")
 
         actual = source_sink_utils.results()
         expected = ['1,Hi,Hello', '2,Hello,Hello']
         self.assert_equals(actual, expected)
 
     def test_query_config(self):
-        query_config = self.t_env.query_config()
+        query_config = StreamQueryConfig()
 
         query_config.with_idle_state_retention_time(
             datetime.timedelta(days=1), datetime.timedelta(days=2))
 
-        assert query_config.get_max_idle_state_retention_time() == 2 * 24 * 
3600 * 1000
-        assert query_config.get_min_idle_state_retention_time() == 24 * 3600 * 
1000
+        self.assertEqual(2 * 24 * 3600 * 1000, 
query_config.get_max_idle_state_retention_time())
+        self.assertEqual(24 * 3600 * 1000, 
query_config.get_min_idle_state_retention_time())
 
     def test_create_table_environment(self):
         table_config = TableConfig()
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 8c56f8f..c475adf 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -39,4 +39,4 @@ commands =
 # up to 100 characters in length, not 79.
 ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504
 max-line-length=100
-exclude=.tox/*,dev/*,lib/*,target/*,build/*,pyflink/shell.py
+exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*,pyflink/shell.py

Reply via email to