hequn8128 commented on a change in pull request #13119:
URL: https://github.com/apache/flink/pull/13119#discussion_r468972272



##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -563,6 +654,27 @@ def key_by(self, key_selector: Union[Callable, 
KeySelector],
                key_type_info: TypeInformation = None) -> 'KeyedStream':
         return self._origin_stream.key_by(key_selector, key_type_info)
 
+    def union(self, *streams) -> 'DataStream':
+        return self._values().union(*streams)
+
+    def shuffle(self) -> 'DataStream':
+        return self._origin_stream.shuffle()
+
+    def project(self, *field_indexes) -> 'DataStream':

Review comment:
       project on Keyedstream should preserve the hash partitioning, i.e., we 
can't simply project on its origin_stream.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -563,6 +654,27 @@ def key_by(self, key_selector: Union[Callable, 
KeySelector],
                key_type_info: TypeInformation = None) -> 'KeyedStream':
         return self._origin_stream.key_by(key_selector, key_type_info)
 
+    def union(self, *streams) -> 'DataStream':
+        return self._values().union(*streams)

Review comment:
       I find we have bugs when handling parallelism for the keyby. Suppose we 
perform x -> key_by-> map, the internal java stream graph would be x -> map1 -> 
keyBy -> map2 -> map3. The parallelism of map1 should equal to x and map2 
should equal to map3. 

##########
File path: flink-python/pyflink/datastream/tests/test_data_stream.py
##########
@@ -242,6 +242,68 @@ def test_print_with_align_output(self):
         self.assertEqual(3, len(plan['nodes']))
         self.assertEqual("Sink: Print to Std. Out", plan['nodes'][2]['type'])
 
+    def test_union_stream(self):
+        ds_1 = self.env.from_collection([1, 2, 3])
+        ds_2 = self.env.from_collection([4, 5, 6])
+        ds_3 = self.env.from_collection([7, 8, 9])
+
+        united_stream = ds_3.union(ds_1, ds_2)
+
+        united_stream.map(lambda x: x + 1).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        source_ids = []
+        union_node_pre_ids = []
+        for node in exec_plan['nodes']:
+            if node['pact'] == 'Data Source':
+                source_ids.append(node['id'])
+            if node['pact'] == 'Operator':
+                for pre in node['predecessors']:
+                    union_node_pre_ids.append(pre['id'])
+
+        source_ids.sort()
+        union_node_pre_ids.sort()
+        self.assertEqual(source_ids, union_node_pre_ids)
+
+    def test_project(self):
+        ds = self.env.from_collection([[1, 2, 3, 4], [5, 6, 7, 8]],
+                                      type_info=Types.TUPLE(
+                                          [Types.INT(), Types.INT(), 
Types.INT(), Types.INT()]))
+        ds.project(1, 3).map(lambda x: (x[0], x[1] + 
1)).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        self.assertEqual(exec_plan['nodes'][1]['type'], 'Projection')
+
+    def test_broadcast(self):
+        ds_1 = self.env.from_collection([1, 2, 3])
+        ds_1.broadcast().map(lambda x: x + 
1).set_parallelism(3).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        broadcast_node = exec_plan['nodes'][1]
+        pre_ship_strategy = broadcast_node['predecessors'][0]['ship_strategy']
+        self.assertEqual(pre_ship_strategy, 'BROADCAST')
+
+    def test_rebalance(self):
+        ds_1 = self.env.from_collection([1, 2, 3])
+        ds_1.rebalance().map(lambda x: x + 
1).set_parallelism(3).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        rebalance_node = exec_plan['nodes'][1]
+        pre_ship_strategy = rebalance_node['predecessors'][0]['ship_strategy']
+        self.assertEqual(pre_ship_strategy, 'REBALANCE')
+
+    def test_rescale(self):
+        ds_1 = self.env.from_collection([1, 2, 3])
+        ds_1.rescale().map(lambda x: x + 
1).set_parallelism(3).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        rescale_node = exec_plan['nodes'][1]
+        pre_ship_strategy = rescale_node['predecessors'][0]['ship_strategy']
+        self.assertEqual(pre_ship_strategy, 'RESCALE')
+
+    def test_shuffle(self):
+        ds_1 = self.env.from_collection([1, 2, 3])
+        ds_1.shuffle().map(lambda x: x + 
1).set_parallelism(3).add_sink(self.test_sink)
+        exec_plan = eval(self.env.get_execution_plan())
+        shuffle_node = exec_plan['nodes'][1]
+        pre_ship_strategy = shuffle_node['predecessors'][0]['ship_strategy']
+        self.assertEqual(pre_ship_strategy, 'SHUFFLE')

Review comment:
       These tests are not correct. Exceptions should be thrown since we cannot 
override partitioning for KeyedStream.

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -563,6 +654,27 @@ def key_by(self, key_selector: Union[Callable, 
KeySelector],
                key_type_info: TypeInformation = None) -> 'KeyedStream':
         return self._origin_stream.key_by(key_selector, key_type_info)
 
+    def union(self, *streams) -> 'DataStream':
+        return self._values().union(*streams)
+
+    def shuffle(self) -> 'DataStream':
+        return self._origin_stream.shuffle()
+
+    def project(self, *field_indexes) -> 'DataStream':
+        return self._origin_stream.project(*field_indexes)
+
+    def rescale(self) -> 'DataStream':
+        return self._origin_stream.rescale()
+
+    def rebalance(self) -> 'DataStream':
+        return self._origin_stream.rebalance()
+
+    def forward(self) -> 'DataStream':
+        return self._origin_stream.forward()
+
+    def broadcast(self) -> 'DataStream':
+        return self._origin_stream.broadcast()

Review comment:
       Cannot override partitioning for KeyedStream.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to