This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 73d140e [FLINK-26920][python] Handles the dependencies properly in
create_temporary_view
73d140e is described below
commit 73d140e3544f65f759bf6c7ca3a7163702386bfc
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 1 15:42:03 2022 +0800
[FLINK-26920][python] Handles the dependencies properly in
create_temporary_view
This closes #19318.
---
flink-python/pyflink/table/table_environment.py | 45 ++++++++++++----------
.../python/beam/BeamPythonFunctionRunner.java | 7 ++--
2 files changed, 29 insertions(+), 23 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index b7fccff..faf7352 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1281,27 +1281,32 @@ class TableEnvironment(object):
"""
if isinstance(table_or_data_stream, Table):
self._j_tenv.createTemporaryView(view_path,
table_or_data_stream._j_table)
- elif len(fields_or_schema) == 0:
- self._j_tenv.createTemporaryView(view_path,
table_or_data_stream._j_data_stream)
- elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0],
str):
- self._j_tenv.createTemporaryView(
- view_path,
- table_or_data_stream._j_data_stream,
- fields_or_schema[0])
- elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0],
Schema):
- self._j_tenv.createTemporaryView(
- view_path,
- table_or_data_stream._j_data_stream,
- fields_or_schema[0]._j_schema)
- elif (len(fields_or_schema) > 0 and
- all(isinstance(elem, Expression) for elem in fields_or_schema)):
- self._j_tenv.createTemporaryView(
- view_path,
- table_or_data_stream._j_data_stream,
- to_expression_jarray(fields_or_schema))
else:
- raise ValueError("Invalid arguments for 'fields': %r" %
- ','.join([repr(item) for item in
fields_or_schema]))
+ j_data_stream = table_or_data_stream._j_data_stream
+ JPythonConfigUtil =
get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
+
JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment())
+
+ if len(fields_or_schema) == 0:
+ self._j_tenv.createTemporaryView(view_path, j_data_stream)
+ elif len(fields_or_schema) == 1 and
isinstance(fields_or_schema[0], str):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ j_data_stream,
+ fields_or_schema[0])
+ elif len(fields_or_schema) == 1 and
isinstance(fields_or_schema[0], Schema):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ j_data_stream,
+ fields_or_schema[0]._j_schema)
+ elif (len(fields_or_schema) > 0 and
+ all(isinstance(elem, Expression) for elem in
fields_or_schema)):
+ self._j_tenv.createTemporaryView(
+ view_path,
+ j_data_stream,
+ to_expression_jarray(fields_or_schema))
+ else:
+ raise ValueError("Invalid arguments for 'fields': %r" %
+ ','.join([repr(item) for item in
fields_or_schema]))
def add_python_file(self, file_path: str):
"""
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index ed8793c..5981687 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -232,9 +232,10 @@ public abstract class BeamPythonFunctionRunner implements
PythonFunctionRunner {
if (memoryManager != null && config.isUsingManagedMemory()) {
Preconditions.checkArgument(
managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
- "The configured managed memory fraction for Python worker
process must be within (0, 1], was: %s. "
- + "It may be because the consumer type \"Python\"
was missing or set to 0 for the config option
\"taskmanager.memory.managed.consumer-weights\"."
- + managedMemoryFraction);
+ String.format(
+ "The configured managed memory fraction for Python
worker process must be within (0, 1], was: %s. "
+ + "It may be because the consumer type
\"Python\" was missing or set to 0 for the config option
\"taskmanager.memory.managed.consumer-weights\".",
+ managedMemoryFraction));
final LongFunctionWithException<PythonSharedResources, Exception>
initializer =
(size) ->