hequn8128 commented on a change in pull request #13155:
URL: https://github.com/apache/flink/pull/13155#discussion_r470989005
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -434,6 +434,60 @@ def broadcast(self) -> 'DataStream':
"""
return DataStream(self._j_data_stream.broadcast())
+ def partition_custom(self, partitioner: Union[Callable, Partitioner],
+ key_selector: Union[Callable, KeySelector]) ->
'DataStream':
+ """
+ Partitions a DataStream on the key returned by the selector, using a
custom partitioner.
+ This method takes the key selector to get the key to partition on, and
a partitioner that
+ accepts the key type.
+
+ Note that this method works only on single field keys, i.e. the
selector cannet return
+ tuples of fields.
+
+ :param partitioner: The partitioner to assign partitions to keys.
+ :param key_selector: The KeySelector with which the DataStream is
partitioned.
+ :return: The partitioned DataStream.
+ """
+ if callable(key_selector):
+ key_selector = KeySelectorFunctionWrapper(key_selector)
+ if not isinstance(key_selector, (KeySelector,
KeySelectorFunctionWrapper)):
+ raise TypeError("Parameter key_selector should be a type of
KeySelector.")
+
+ if callable(partitioner):
+ partitioner = PartitionerFunctionWrapper(partitioner)
+ if not isinstance(partitioner, (Partitioner,
PartitionerFunctionWrapper)):
+ raise TypeError("Parameter partitioner should be a type of
Partitioner.")
+
+ gateway = get_gateway()
+ data_stream_num_partitions_env_key = gateway.jvm\
+ .org.apache.flink.datastream.runtime.operators.python\
+
.DataStreamPythonPartitionCustomFunctionOperator.DATA_STREAM_NUM_PARTITIONS
+
+ def partition_custom_map(value):
+ num_partitions =
int(os.environ[data_stream_num_partitions_env_key])
Review comment:
Can we find ways to init `num_partitions` only once?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -344,7 +345,7 @@ protected PythonEnvironmentManager
createPythonEnvironmentManager() throws IOExc
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
- System.getenv());
+ new HashMap<>(System.getenv()));
Review comment:
Why need this change?
##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -304,6 +305,28 @@ def test_shuffle(self):
pre_ship_strategy = shuffle_node['predecessors'][0]['ship_strategy']
self.assertEqual(pre_ship_strategy, 'SHUFFLE')
+ def test_partition_custom(self):
+ ds = self.env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1),
('e', 2)],
Review comment:
add a number which greater than `expected_num_partitions`
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -324,3 +364,16 @@ def __init__(self, sink_func: Union[str, JavaObject]):
:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
+
+
+class PartitionCustomMapFunction(MapFunction):
Review comment:
Add this function here would confuse users since this is not a public
Function. I think we can find other ways to judge whether this is a
`PartitionCustomMapFunction`. For example, we can add a local `MapFunction`
returning a special `__expr__`(toString).
##########
File path:
flink-python/src/main/java/org/apache/flink/python/env/beam/ProcessPythonEnvironmentManager.java
##########
@@ -240,6 +240,10 @@ public String createRetrievalToken() throws IOException {
return env;
}
+ public void appendEnvironmentVariable(String key, String value) {
Review comment:
Rename to `setEnvironmentVariable`. This method would replace the old
value if key exists.
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -154,6 +154,22 @@ def filter(self, value):
pass
+class Partitioner(Function):
+ """
+ Function to implement a custom partition assignment for keys.
+ """
+
+ @abc.abstractmethod
+ def partition(self, key, num_partitions: int) -> int:
+ """
+ Computes the partition for the given key.
Review comment:
Keep a line break.
##########
File path:
flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomPartitioner.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.runtime.functions.python;
+
+import org.apache.flink.api.common.functions.Partitioner;
+
+/**
+ * A {@link PartitionCustomPartitioner} is a dedicated Partitioner for Python
DataStream custom partitioning operation.
+ * It will directly return the calculated key value as the partition index of
current task.
+ */
+public class PartitionCustomPartitioner implements Partitioner<Integer> {
Review comment:
Don't need this class. We can reuse
`org.apache.flink.api.java.functions.IdPartitioner`
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -274,6 +290,30 @@ def get_key(self, value):
return self._func(value)
+class PartitionerFunctionWrapper(FunctionWrapper):
+ """
+ A wrapper class for Partitioner. It's used for wrapping up user defined
function in a
+ Partitioner when user does not implement a Partitioner but directly pass a
function
+ object or a lambda function to partition_custom() function.
+ """
+ def __init__(self, func):
+ """
+ The constructor of PartitionerFunctionWrapper.
+
+ :param func: user defined function object.
+ """
+ super(PartitionerFunctionWrapper, self).__init__(func)
+
+ def partition(self, key, num_partitions):
Review comment:
Type hint
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]