shuiqiangchen commented on a change in pull request #13119:
URL: https://github.com/apache/flink/pull/13119#discussion_r469004979
##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -242,6 +242,68 @@ def test_print_with_align_output(self):
self.assertEqual(3, len(plan['nodes']))
self.assertEqual("Sink: Print to Std. Out", plan['nodes'][2]['type'])
+ def test_union_stream(self):
+ ds_1 = self.env.from_collection([1, 2, 3])
+ ds_2 = self.env.from_collection([4, 5, 6])
+ ds_3 = self.env.from_collection([7, 8, 9])
+
+ united_stream = ds_3.union(ds_1, ds_2)
+
+ united_stream.map(lambda x: x + 1).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ source_ids = []
+ union_node_pre_ids = []
+ for node in exec_plan['nodes']:
+ if node['pact'] == 'Data Source':
+ source_ids.append(node['id'])
+ if node['pact'] == 'Operator':
+ for pre in node['predecessors']:
+ union_node_pre_ids.append(pre['id'])
+
+ source_ids.sort()
+ union_node_pre_ids.sort()
+ self.assertEqual(source_ids, union_node_pre_ids)
+
+ def test_project(self):
+ ds = self.env.from_collection([[1, 2, 3, 4], [5, 6, 7, 8]],
+ type_info=Types.TUPLE(
+ [Types.INT(), Types.INT(),
Types.INT(), Types.INT()]))
+ ds.project(1, 3).map(lambda x: (x[0], x[1] +
1)).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ self.assertEqual(exec_plan['nodes'][1]['type'], 'Projection')
+
+ def test_broadcast(self):
+ ds_1 = self.env.from_collection([1, 2, 3])
+ ds_1.broadcast().map(lambda x: x +
1).set_parallelism(3).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ broadcast_node = exec_plan['nodes'][1]
+ pre_ship_strategy = broadcast_node['predecessors'][0]['ship_strategy']
+ self.assertEqual(pre_ship_strategy, 'BROADCAST')
+
+ def test_rebalance(self):
+ ds_1 = self.env.from_collection([1, 2, 3])
+ ds_1.rebalance().map(lambda x: x +
1).set_parallelism(3).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ rebalance_node = exec_plan['nodes'][1]
+ pre_ship_strategy = rebalance_node['predecessors'][0]['ship_strategy']
+ self.assertEqual(pre_ship_strategy, 'REBALANCE')
+
+ def test_rescale(self):
+ ds_1 = self.env.from_collection([1, 2, 3])
+ ds_1.rescale().map(lambda x: x +
1).set_parallelism(3).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ rescale_node = exec_plan['nodes'][1]
+ pre_ship_strategy = rescale_node['predecessors'][0]['ship_strategy']
+ self.assertEqual(pre_ship_strategy, 'RESCALE')
+
+ def test_shuffle(self):
+ ds_1 = self.env.from_collection([1, 2, 3])
+ ds_1.shuffle().map(lambda x: x +
1).set_parallelism(3).add_sink(self.test_sink)
+ exec_plan = eval(self.env.get_execution_plan())
+ shuffle_node = exec_plan['nodes'][1]
+ pre_ship_strategy = shuffle_node['predecessors'][0]['ship_strategy']
+ self.assertEqual(pre_ship_strategy, 'SHUFFLE')
Review comment:
Thank you , I have added a specific test for overriding partitioning of
KeyedStream.
```
def test_keyed_stream_partitioning(self):
ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3),
('deeefg', 4)])
keyed_stream = ds.key_by(lambda x: x[1])
with self.assertRaises(Exception):
keyed_stream.shuffle()
with self.assertRaises(Exception):
keyed_stream.rebalance()
with self.assertRaises(Exception):
keyed_stream.rescale()
with self.assertRaises(Exception):
keyed_stream.broadcast()
with self.assertRaises(Exception):
keyed_stream.forward()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]