dianfu commented on a change in pull request #13611:
URL: https://github.com/apache/flink/pull/13611#discussion_r505107742
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -467,25 +466,26 @@ def partition_custom(self, partitioner: Union[Callable,
Partitioner],
raise TypeError("Parameter partitioner should be a type of
Partitioner.")
gateway = get_gateway()
- data_stream_num_partitions_env_key = \
-
gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
class PartitionCustomMapFunction(MapFunction):
"""
A wrapper class for partition_custom map function. It indicates
that it is a partition
- custom operation that we need to apply
DataStreamPythonPartitionCustomFunctionOperator
+ custom operation that we need to apply
PythonPartitionCustomOperator
to run the map function.
"""
def __init__(self):
self.num_partitions = None
- def map(self, value):
- return self.partition_custom_map(value)
+ def open(self, runtime_context: RuntimeContext):
+ self.num_partitions = int(runtime_context.get_job_parameter(
+ "DATA_STREAM_NUM_PARTITIONS", "-1"))
+ if self.num_partitions <= 0:
+ raise ValueError(
+ "The partition number should be a positive value, got
%s"
+ % self.num_partitions)
Review comment:
In PythonPartitionCustomOperator, the numPartitions will always be
positive as it's the parallelism. So the validation there is not necessary. The
validation here checks that the numPartitions is transmitted to the Python
user-defined function correctly. Otherwise, it will be -1. Thoughts?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]