dianfu commented on a change in pull request #13611:
URL: https://github.com/apache/flink/pull/13611#discussion_r505107742



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -467,25 +466,26 @@ def partition_custom(self, partitioner: Union[Callable, 
Partitioner],
             raise TypeError("Parameter partitioner should be a type of 
Partitioner.")
 
         gateway = get_gateway()
-        data_stream_num_partitions_env_key = \
-            
gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
 
         class PartitionCustomMapFunction(MapFunction):
             """
             A wrapper class for partition_custom map function. It indicates 
that it is a partition
-            custom operation that we need to apply 
DataStreamPythonPartitionCustomFunctionOperator
+            custom operation that we need to apply 
PythonPartitionCustomOperator
             to run the map function.
             """
 
             def __init__(self):
                 self.num_partitions = None
 
-            def map(self, value):
-                return self.partition_custom_map(value)
+            def open(self, runtime_context: RuntimeContext):
+                self.num_partitions = int(runtime_context.get_job_parameter(
+                    "DATA_STREAM_NUM_PARTITIONS", "-1"))
+                if self.num_partitions <= 0:
+                    raise ValueError(
+                        "The partition number should be a positive value, got 
%s"
+                        % self.num_partitions)

Review comment:
       In PythonPartitionCustomOperator, the numPartitions will always be 
positive as it's the parallelism. So the validation there is not necessary. The 
validation here checks that the numPartitions is transmitted to the Python 
user-defined function correctly. Otherwise, it will be -1. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to