This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 4a3d6e5  [FLINK-26920][python] Handles the dependencies properly in 
create_temporary_view
4a3d6e5 is described below

commit 4a3d6e52fe820e53b7f3d9bf301b4a1c7d14ab41
Author: Dian Fu <[email protected]>
AuthorDate: Fri Apr 1 15:42:03 2022 +0800

    [FLINK-26920][python] Handles the dependencies properly in 
create_temporary_view
    
    This closes #19318.
---
 flink-python/pyflink/table/table_environment.py    | 45 ++++++++++++----------
 .../python/beam/BeamPythonFunctionRunner.java      |  7 ++--
 2 files changed, 29 insertions(+), 23 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index e001f36..07ed66c 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1156,27 +1156,32 @@ class TableEnvironment(object):
         """
         if isinstance(table_or_data_stream, Table):
             self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_table)
-        elif len(fields_or_schema) == 0:
-            self._j_tenv.createTemporaryView(view_path, 
table_or_data_stream._j_data_stream)
-        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
str):
-            self._j_tenv.createTemporaryView(
-                view_path,
-                table_or_data_stream._j_data_stream,
-                fields_or_schema[0])
-        elif len(fields_or_schema) == 1 and isinstance(fields_or_schema[0], 
Schema):
-            self._j_tenv.createTemporaryView(
-                view_path,
-                table_or_data_stream._j_data_stream,
-                fields_or_schema[0]._j_schema)
-        elif (len(fields_or_schema) > 0 and
-              all(isinstance(elem, Expression) for elem in fields_or_schema)):
-            self._j_tenv.createTemporaryView(
-                view_path,
-                table_or_data_stream._j_data_stream,
-                to_expression_jarray(fields_or_schema))
         else:
-            raise ValueError("Invalid arguments for 'fields': %r" %
-                             ','.join([repr(item) for item in 
fields_or_schema]))
+            j_data_stream = table_or_data_stream._j_data_stream
+            JPythonConfigUtil = 
get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
+            
JPythonConfigUtil.configPythonOperator(j_data_stream.getExecutionEnvironment())
+
+            if len(fields_or_schema) == 0:
+                self._j_tenv.createTemporaryView(view_path, j_data_stream)
+            elif len(fields_or_schema) == 1 and 
isinstance(fields_or_schema[0], str):
+                self._j_tenv.createTemporaryView(
+                    view_path,
+                    j_data_stream,
+                    fields_or_schema[0])
+            elif len(fields_or_schema) == 1 and 
isinstance(fields_or_schema[0], Schema):
+                self._j_tenv.createTemporaryView(
+                    view_path,
+                    j_data_stream,
+                    fields_or_schema[0]._j_schema)
+            elif (len(fields_or_schema) > 0 and
+                  all(isinstance(elem, Expression) for elem in 
fields_or_schema)):
+                self._j_tenv.createTemporaryView(
+                    view_path,
+                    j_data_stream,
+                    to_expression_jarray(fields_or_schema))
+            else:
+                raise ValueError("Invalid arguments for 'fields': %r" %
+                                 ','.join([repr(item) for item in 
fields_or_schema]))
 
     def add_python_file(self, file_path: str):
         """
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 86264ab..da4e111 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -232,9 +232,10 @@ public abstract class BeamPythonFunctionRunner implements 
PythonFunctionRunner {
         if (memoryManager != null && config.isUsingManagedMemory()) {
             Preconditions.checkArgument(
                     managedMemoryFraction > 0 && managedMemoryFraction <= 1.0,
-                    "The configured managed memory fraction for Python worker 
process must be within (0, 1], was: %s. "
-                            + "It may be because the consumer type \"Python\" 
was missing or set to 0 for the config option 
\"taskmanager.memory.managed.consumer-weights\"."
-                            + managedMemoryFraction);
+                    String.format(
+                            "The configured managed memory fraction for Python 
worker process must be within (0, 1], was: %s. "
+                                    + "It may be because the consumer type 
\"Python\" was missing or set to 0 for the config option 
\"taskmanager.memory.managed.consumer-weights\".",
+                            managedMemoryFraction));
 
             final LongFunctionWithException<PythonSharedResources, Exception> 
initializer =
                     (size) ->

Reply via email to