dianfu commented on a change in pull request #12902:
URL: https://github.com/apache/flink/pull/12902#discussion_r490091561



##########
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##########
@@ -445,6 +446,46 @@ def test_to_retract_stream(self):
                     "(True, <Row(2, 'Hello')>)"]
         self.assertEqual(result, expected)
 
+    def test_collect_for_all_data_types(self):
+
+        def collect_from_source(element_data, expected_output):
+            source = self.t_env.from_elements(element_data, ["a"])
+            result = source.execute().collect()
+            collected_result = []
+            for i in result:
+                collected_result.append(i)
+            self.assertEqual(expected_output, collected_result)
+
+        data_in_different_types = [True, 1, "a", u"a", datetime.date(1970, 1, 
1),
+                                   datetime.time(0, 0, 0), 1.0, [1], (1,), 
{"a": 1}, bytearray(1)]
+        expected_results = [Row([True]), Row([1]), Row(['a']), Row(['a']),
+                            Row([datetime.date(1970, 1, 1)]), 
Row([datetime.time(0, 0)]),
+                            Row([1.0]), Row([[1]]), Row([Row([1])]), 
Row([{'a': 1}]),
+                            Row([bytearray(b'\x00')])]
+        zipped_datas = zip(data_in_different_types, expected_results)
+        for data, expected_data in zipped_datas:
+            element_data = [Row(data) for _ in range(2)]
+            expected_output = [expected_data for _ in range(2)]
+            collect_from_source(element_data, expected_output)
+
+    def test_collect_with_retract(self):
+        element_data = [(1, 2, 'a'),
+                        (3, 4, 'b'),
+                        (5, 6, 'a'),
+                        (7, 8, 'b')]
+
+        source = self.t_env.from_elements(element_data, ["a", "b", "c"])
+        result = self.t_env.execute_sql("SELECT SUM(a), c FROM %s group by c" 
% source).collect()
+        collected_result = []
+        for i in result:
+            collected_result.append(i)
+        # The result contains one delete row and an insert row in retract 
operation for each key.

Review comment:
       PR #13371 has been merged and so could you also rebase the PR after 
addressing the above comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to