shuiqiangchen commented on a change in pull request #13611:
URL: https://github.com/apache/flink/pull/13611#discussion_r504755235



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -467,25 +466,26 @@ def partition_custom(self, partitioner: Union[Callable, 
Partitioner],
             raise TypeError("Parameter partitioner should be a type of 
Partitioner.")
 
         gateway = get_gateway()
-        data_stream_num_partitions_env_key = \
-            
gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
 
         class PartitionCustomMapFunction(MapFunction):
             """
             A wrapper class for partition_custom map function. It indicates 
that it is a partition
-            custom operation that we need to apply 
DataStreamPythonPartitionCustomFunctionOperator
+            custom operation that we need to apply 
PythonPartitionCustomOperator
             to run the map function.
             """
 
             def __init__(self):
                 self.num_partitions = None
 
-            def map(self, value):
-                return self.partition_custom_map(value)
+            def open(self, runtime_context: RuntimeContext):
+                self.num_partitions = int(runtime_context.get_job_parameter(
+                    "DATA_STREAM_NUM_PARTITIONS", "-1"))
+                if self.num_partitions <= 0:
+                    raise ValueError(
+                        "The partition number should be a positive value, got 
%s"
+                        % self.num_partitions)

Review comment:
       It would be better to do parameter validation in 
PythonPartitionCustomOperator when setting the numPartitions before executing 
the job.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to