shuiqiangchen commented on a change in pull request #13140:
URL: https://github.com/apache/flink/pull/13140#discussion_r470438448
##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -322,6 +322,88 @@ def test_keyed_stream_partitioning(self):
with self.assertRaises(Exception):
keyed_stream.forward()
+ def test_slot_sharing_group(self):
+ source_operator_name = 'collection source'
+ map_operator_name = 'map_operator'
+ slot_sharing_group_1 = 'slot_sharing_group_1'
+ slot_sharing_group_2 = 'slot_sharing_group_2'
+ ds_1 = self.env.from_collection([1, 2, 3]).name(source_operator_name)
+ ds_1.slot_sharing_group(slot_sharing_group_1).map(lambda x: x +
1).set_parallelism(3)\
+ .name(map_operator_name).slot_sharing_group(slot_sharing_group_2)\
+ .add_sink(self.test_sink)
+
+ j_generated_stream_graph = self.env._j_stream_execution_environment \
+ .getStreamGraph("test start new_chain", True)
+
+ j_stream_nodes =
list(j_generated_stream_graph.getStreamNodes().toArray())
+ for j_stream_node in j_stream_nodes:
+ if j_stream_node.getOperatorName() == source_operator_name:
+ self.assertEqual(j_stream_node.getSlotSharingGroup(),
slot_sharing_group_1)
+ elif j_stream_node.getOperatorName() == map_operator_name:
+ self.assertEqual(j_stream_node.getSlotSharingGroup(),
slot_sharing_group_2)
+
+ def test_chaining_strategy(self):
+ chained_operator_name_0 = "map_operator_0"
+ chained_operator_name_1 = "map_operator_1"
+ chained_operator_name_2 = "map_operator_2"
+
+ ds = self.env.from_collection([1, 2, 3])
+ ds.map(lambda x: x).set_parallelism(2).name(chained_operator_name_0)\
+ .map(lambda x: x).set_parallelism(2).name(chained_operator_name_1)\
+ .map(lambda x: x).set_parallelism(2).name(chained_operator_name_2)\
+ .add_sink(self.test_sink)
+
+ def assert_chainable(j_stream_graph, expected_upstream_chainable,
+ expected_downstream_chainable):
+ j_stream_nodes = list(j_stream_graph.getStreamNodes().toArray())
+ for j_stream_node in j_stream_nodes:
+ if j_stream_node.getOperatorName() == chained_operator_name_1:
+ JStreamingJobGraphGenerator = get_gateway().jvm \
+
.org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator
+
+ j_in_stream_edge = j_stream_node.getInEdges().get(0)
+ upstream_chainable =
JStreamingJobGraphGenerator.isChainable(j_in_stream_edge,
+
j_stream_graph)
+ self.assertEqual(expected_upstream_chainable,
upstream_chainable)
+
+ j_out_stream_edge = j_stream_node.getOutEdges().get(0)
+ downstream_chainable =
JStreamingJobGraphGenerator.isChainable(
+ j_out_stream_edge, j_stream_graph)
+ self.assertEqual(expected_downstream_chainable,
downstream_chainable)
+
+ # The map_operator_1 has the same parallelism with source operator and
map_operator_2, and
+ # ship_strategy for collection source and map_operator_1 is FORWARD,
so the map_operator_1
+ # can be chained with collection source and map_operator_2.
Review comment:
Thank you , I will update it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]