This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2d7e64a [hotfix][python] Aligns with Java Table API by removing
methods exec_env and query_config
2d7e64a is described below
commit 2d7e64aa481ad03069ecdbdffbc8a22254e94d72
Author: Dian Fu <[email protected]>
AuthorDate: Thu Jun 27 15:47:50 2019 +0800
[hotfix][python] Aligns with Java Table API by removing methods exec_env
and query_config
This closes #8910
---
flink-python/dev/pip_test_code.py | 6 +--
.../dataset/tests/test_execution_environment.py | 2 +-
.../tests/test_stream_execution_environment.py | 2 +-
flink-python/pyflink/shell.py | 37 +++++++++-------
.../pyflink/table/examples/batch/word_count.py | 10 +++--
flink-python/pyflink/table/table.py | 2 +-
flink-python/pyflink/table/table_environment.py | 51 +---------------------
flink-python/pyflink/table/tests/test_calc.py | 2 +-
.../pyflink/table/tests/test_descriptor.py | 4 +-
.../pyflink/table/tests/test_shell_example.py | 12 ++---
.../table/tests/test_table_environment_api.py | 17 ++++----
flink-python/tox.ini | 2 +-
12 files changed, 53 insertions(+), 94 deletions(-)
diff --git a/flink-python/dev/pip_test_code.py
b/flink-python/dev/pip_test_code.py
index c9a4798..29f9841 100644
--- a/flink-python/dev/pip_test_code.py
+++ b/flink-python/dev/pip_test_code.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
# test pyflink shell environment
-from pyflink.shell import bt_env, FileSystem, OldCsv, DataTypes, Schema
+from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
import tempfile
import os
@@ -28,7 +28,7 @@ if os.path.exists(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
-bt_env.exec_env().set_parallelism(1)
+b_env.set_parallelism(1)
t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b',
'c'])
bt_env.connect(FileSystem().path(sink_path)) \
.with_format(OldCsv()
@@ -44,7 +44,7 @@ bt_env.connect(FileSystem().path(sink_path)) \
t.select("a + 1, b, c").insert_into("batch_sink")
-bt_env.exec_env().execute()
+b_env.execute()
with open(sink_path, 'r') as f:
lines = f.read()
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py
b/flink-python/pyflink/dataset/tests/test_execution_environment.py
index 7adac00..9dbed96 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment.py
@@ -110,6 +110,6 @@ class ExecutionEnvironmentTests(PyFlinkTestCase):
CsvTableSink(field_names, field_types, tmp_csv))
t_env.scan("Orders").insert_into("Results")
- plan = t_env.exec_env().get_execution_plan()
+ plan = self.env.get_execution_plan()
json.loads(plan)
diff --git
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 43768fa..b02f660 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -187,6 +187,6 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
CsvTableSink(field_names, field_types, tmp_csv))
t_env.scan("Orders").insert_into("Results")
- plan = t_env.exec_env().get_execution_plan()
+ plan = self.env.get_execution_plan()
json.loads(plan)
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index 2ea3a6b..97b7065 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -20,8 +20,9 @@ import codecs
import platform
import sys
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.common import *
+from pyflink.dataset import *
+from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
from pyflink.table.descriptors import *
@@ -80,7 +81,7 @@ welcome_msg = u'''
NOTE: Use the prebound Table Environment to implement batch or streaming Table
programs.
- Batch - Use the 'bt_env' variable
+ Batch - Use 'b_env' and 'bt_env' variables
*
* import tempfile
@@ -92,25 +93,25 @@ NOTE: Use the prebound Table Environment to implement batch
or streaming Table p
* os.remove(sink_path)
* else:
* shutil.rmtree(sink_path)
- * bt_env.exec_env().set_parallelism(1)
+ * b_env.set_parallelism(1)
* t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
'b', 'c'])
- * bt_env.connect(FileSystem().path(sink_path))\
+ * bt_env.connect(FileSystem().path(sink_path)) \\
* .with_format(OldCsv()
* .field_delimiter(',')
* .field("a", DataTypes.BIGINT())
* .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING()))\
+ * .field("c", DataTypes.STRING())) \\
* .with_schema(Schema()
* .field("a", DataTypes.BIGINT())
* .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING()))\
+ * .field("c", DataTypes.STRING())) \\
* .register_table_sink("batch_sink")
*
* t.select("a + 1, b, c").insert_into("batch_sink")
*
- * bt_env.exec_env().execute()
+ * b_env.execute()
- Streaming - Use the 'st_env' variable
+ Streaming - Use 's_env' and 'st_env' variables
*
* import tempfile
@@ -122,26 +123,30 @@ NOTE: Use the prebound Table Environment to implement
batch or streaming Table p
* os.remove(sink_path)
* else:
* shutil.rmtree(sink_path)
- * st_env.exec_env().set_parallelism(1)
+ * s_env.set_parallelism(1)
* t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
'b', 'c'])
- * st_env.connect(FileSystem().path(sink_path))\\
+ * st_env.connect(FileSystem().path(sink_path)) \\
* .with_format(OldCsv()
* .field_delimiter(',')
* .field("a", DataTypes.BIGINT())
* .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING()))\\
+ * .field("c", DataTypes.STRING())) \\
* .with_schema(Schema()
* .field("a", DataTypes.BIGINT())
* .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING()))\\
+ * .field("c", DataTypes.STRING())) \\
* .register_table_sink("stream_sink")
*
* t.select("a + 1, b, c").insert_into("stream_sink")
*
- * st_env.exec_env().execute()
+ * s_env.execute()
'''
utf8_out.write(welcome_msg)
-bt_env =
BatchTableEnvironment.create(ExecutionEnvironment.get_execution_environment())
+b_env = ExecutionEnvironment.get_execution_environment()
-st_env =
StreamTableEnvironment.create(StreamExecutionEnvironment.get_execution_environment())
+bt_env = BatchTableEnvironment.create(b_env)
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+
+st_env = StreamTableEnvironment.create(s_env)
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py
b/flink-python/pyflink/table/examples/batch/word_count.py
index 721e002..c3bc1e2 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -21,7 +21,8 @@ import shutil
import sys
import tempfile
-from pyflink.table import TableConfig, TableEnvironment
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment, TableConfig
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.types import DataTypes
@@ -35,8 +36,9 @@ def word_count():
"License you may not use this file except in compliance " \
"with the License"
- t_config = TableConfig.Builder().as_batch_execution().build()
- t_env = TableEnvironment.create(t_config)
+ t_config = TableConfig()
+ env = ExecutionEnvironment.get_execution_environment()
+ t_env = BatchTableEnvironment.create(env, t_config)
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
@@ -68,7 +70,7 @@ def word_count():
.select("word, count(1) as count") \
.insert_into("Results")
- t_env.exec_env().execute()
+ env.execute()
if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index a02daf2..508ea17 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -52,7 +52,7 @@ class Table(object):
...
>>> t_env.register_table_sink("result", ...)
>>> t.insert_into("result")
- >>> t_env.exec_env().execute()
+ >>> env.execute()
Operations such as :func:`~pyflink.table.Table.join`,
:func:`~pyflink.table.Table.select`,
:func:`~pyflink.table.Table.where` and
:func:`~pyflink.table.Table.group_by`
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index bfb7f32..9c4eabe 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -19,11 +19,9 @@ import os
import tempfile
from abc import ABCMeta, abstractmethod
-from pyflink.dataset import ExecutionEnvironment
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table.catalog import Catalog
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig,
QueryConfig
+from pyflink.table.query_config import QueryConfig
from pyflink.table.table_config import TableConfig
from pyflink.table.descriptors import (StreamTableDescriptor,
ConnectorDescriptor,
BatchTableDescriptor)
@@ -383,13 +381,6 @@ class TableEnvironment(object):
self._j_tenv.useDatabase(database_name)
@abstractmethod
- def exec_env(self):
- """
- :return: The execution environment of this table environment.
- """
- pass
-
- @abstractmethod
def get_config(self):
"""
Returns the table config to define the runtime behavior of the Table
API.
@@ -399,16 +390,6 @@ class TableEnvironment(object):
pass
@abstractmethod
- def query_config(self):
- """
- Returns a :class:`StreamQueryConfig` that holds parameters to
configure the behavior of
- streaming queries.
-
- :return: A new :class:`StreamQueryConfig` or :class:`BatchQueryConfig`.
- """
- pass
-
- @abstractmethod
def connect(self, connector_descriptor):
"""
Creates a table source and/or table sink from a descriptor.
@@ -548,15 +529,6 @@ class StreamTableEnvironment(TableEnvironment):
table_config._j_table_config = self._j_tenv.getConfig()
return table_config
- def query_config(self):
- """
- Returns a :class:`StreamQueryConfig` that holds parameters to
configure the behavior of
- streaming queries.
-
- :return: A new :class:`StreamQueryConfig`.
- """
- return StreamQueryConfig()
-
def connect(self, connector_descriptor):
"""
Creates a table source and/or table sink from a descriptor.
@@ -588,12 +560,6 @@ class StreamTableEnvironment(TableEnvironment):
return StreamTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- def exec_env(self):
- """
- :return: The stream execution environment of this table environment.
- """
- return StreamExecutionEnvironment(self._j_tenv.execEnv())
-
@classmethod
def create(cls, stream_execution_environment, table_config=None):
gateway = get_gateway()
@@ -630,15 +596,6 @@ class BatchTableEnvironment(TableEnvironment):
table_config._j_table_config = self._j_tenv.getConfig()
return table_config
- def query_config(self):
- """
- Returns the :class:`BatchQueryConfig` that holds parameters to
configure the behavior of
- batch queries.
-
- :return: A new :class:`BatchQueryConfig`.
- """
- return BatchQueryConfig()
-
def connect(self, connector_descriptor):
"""
Creates a table source and/or table sink from a descriptor.
@@ -670,12 +627,6 @@ class BatchTableEnvironment(TableEnvironment):
return BatchTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- def exec_env(self):
- """
- :return: The stream execution environment of this table environment.
- """
- return ExecutionEnvironment(self._j_tenv.execEnv())
-
@classmethod
def create(cls, execution_environment, table_config=None):
gateway = get_gateway()
diff --git a/flink-python/pyflink/table/tests/test_calc.py
b/flink-python/pyflink/table/tests/test_calc.py
index 3e13450..7df6e68 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -97,7 +97,7 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
PythonOnlyPoint(3.0, 4.0))],
schema)
t.insert_into("Results")
- t_env.exec_env().execute()
+ self.env.execute()
actual = source_sink_utils.results()
expected = ['1,1.0,hi,hello,1970-01-02,01:00:00,1970-01-02 00:00:00.0,'
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py
b/flink-python/pyflink/table/tests/test_descriptor.py
index 7ea3d8b..1f313a7 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -956,7 +956,7 @@ class AbstractTableDescriptorTests(object):
t_env.scan("source") \
.select("a + 1, b, c") \
.insert_into("sink")
- t_env.exec_env().execute()
+ self.env.execute()
with open(sink_path, 'r') as f:
lines = f.read()
@@ -998,7 +998,7 @@ class AbstractTableDescriptorTests(object):
t_env.scan("source") \
.select("a + 1, b, c") \
.insert_into("sink")
- t_env.exec_env().execute()
+ self.env.execute()
with open(sink_path, 'r') as f:
lines = f.read()
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py
b/flink-python/pyflink/table/tests/test_shell_example.py
index 10c2eac..97bce08 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -24,7 +24,7 @@ class ShellExampleTests(PyFlinkTestCase):
"""
def test_batch_case(self):
- from pyflink.shell import bt_env, FileSystem, OldCsv, DataTypes, Schema
+ from pyflink.shell import b_env, bt_env, FileSystem, OldCsv,
DataTypes, Schema
# example begin
import tempfile
@@ -36,7 +36,7 @@ class ShellExampleTests(PyFlinkTestCase):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
- bt_env.exec_env().set_parallelism(1)
+ b_env.set_parallelism(1)
t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
['a', 'b', 'c'])
bt_env.connect(FileSystem().path(sink_path))\
.with_format(OldCsv()
@@ -52,7 +52,7 @@ class ShellExampleTests(PyFlinkTestCase):
t.select("a + 1, b, c").insert_into("batch_sink")
- bt_env.exec_env().execute()
+ b_env.execute()
# verify code, do not copy these code to shell.py
with open(sink_path, 'r') as f:
@@ -60,7 +60,7 @@ class ShellExampleTests(PyFlinkTestCase):
self.assertEqual(lines, '2,hi,hello\n' + '3,hi,hello\n')
def test_stream_case(self):
- from pyflink.shell import st_env, FileSystem, OldCsv, DataTypes, Schema
+ from pyflink.shell import s_env, st_env, FileSystem, OldCsv,
DataTypes, Schema
# example begin
import tempfile
@@ -72,7 +72,7 @@ class ShellExampleTests(PyFlinkTestCase):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
- st_env.exec_env().set_parallelism(1)
+ s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
['a', 'b', 'c'])
st_env.connect(FileSystem().path(sink_path))\
.with_format(OldCsv()
@@ -88,7 +88,7 @@ class ShellExampleTests(PyFlinkTestCase):
t.select("a + 1, b, c").insert_into("stream_sink")
- st_env.exec_env().execute()
+ s_env.execute()
# verify code, do not copy these code to shell.py
with open(sink_path, 'r') as f:
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 64080f1..a129d22 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -22,6 +22,7 @@ from py4j.compat import unicode
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamQueryConfig
from pyflink.table.table_environment import BatchTableEnvironment,
StreamTableEnvironment
from pyflink.table.table_config import TableConfig
from pyflink.table.types import DataTypes, RowType
@@ -53,7 +54,7 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.from_elements([(1, "Hi", "Hello")], ["a", "b",
"c"]).insert_into("Sinks")
- t_env.exec_env().execute()
+ self.env.execute()
actual = source_sink_utils.results()
expected = ['1,Hi,Hello']
@@ -114,7 +115,7 @@ class
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.insert_into("sinks")
- t_env.exec_env().execute()
+ self.env.execute()
actual = source_sink_utils.results()
expected = ['2,Hi,Hello', '3,Hello,Hello']
@@ -130,7 +131,7 @@ class
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.sql_update("insert into sinks select * from %s" % source)
- t_env.exec_env().execute("test_sql_job")
+ self.env.execute("test_sql_job")
actual = source_sink_utils.results()
expected = ['1,Hi,Hello', '2,Hello,Hello']
@@ -144,25 +145,25 @@ class
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
t_env.register_table_sink(
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
- query_config = t_env.query_config()
+ query_config = StreamQueryConfig()
query_config.with_idle_state_retention_time(
datetime.timedelta(days=1), datetime.timedelta(days=2))
t_env.sql_update("insert into sinks select * from %s" % source,
query_config)
- t_env.exec_env().execute("test_sql_job")
+ self.env.execute("test_sql_job")
actual = source_sink_utils.results()
expected = ['1,Hi,Hello', '2,Hello,Hello']
self.assert_equals(actual, expected)
def test_query_config(self):
- query_config = self.t_env.query_config()
+ query_config = StreamQueryConfig()
query_config.with_idle_state_retention_time(
datetime.timedelta(days=1), datetime.timedelta(days=2))
- assert query_config.get_max_idle_state_retention_time() == 2 * 24 *
3600 * 1000
- assert query_config.get_min_idle_state_retention_time() == 24 * 3600 *
1000
+ self.assertEqual(2 * 24 * 3600 * 1000,
query_config.get_max_idle_state_retention_time())
+ self.assertEqual(24 * 3600 * 1000,
query_config.get_min_idle_state_retention_time())
def test_create_table_environment(self):
table_config = TableConfig()
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 8c56f8f..c475adf 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -39,4 +39,4 @@ commands =
# up to 100 characters in length, not 79.
ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504
max-line-length=100
-exclude=.tox/*,dev/*,lib/*,target/*,build/*,pyflink/shell.py
+exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*,pyflink/shell.py