shuiqiangchen commented on a change in pull request #13611:
URL: https://github.com/apache/flink/pull/13611#discussion_r504755235
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -467,25 +466,26 @@ def partition_custom(self, partitioner: Union[Callable,
Partitioner],
raise TypeError("Parameter partitioner should be a type of
Partitioner.")
gateway = get_gateway()
- data_stream_num_partitions_env_key = \
-
gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
class PartitionCustomMapFunction(MapFunction):
"""
A wrapper class for partition_custom map function. It indicates
that it is a partition
- custom operation that we need to apply
DataStreamPythonPartitionCustomFunctionOperator
+ custom operation that we need to apply
PythonPartitionCustomOperator
to run the map function.
"""
def __init__(self):
self.num_partitions = None
- def map(self, value):
- return self.partition_custom_map(value)
+ def open(self, runtime_context: RuntimeContext):
+ self.num_partitions = int(runtime_context.get_job_parameter(
+ "DATA_STREAM_NUM_PARTITIONS", "-1"))
+ if self.num_partitions <= 0:
+ raise ValueError(
+ "The partition number should be a positive value, got
%s"
+ % self.num_partitions)
Review comment:
It would be better to do parameter validation in
PythonPartitionCustomOperator when setting the numPartitions before executing
the job.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]