dianfu commented on a change in pull request #12902:
URL: https://github.com/apache/flink/pull/12902#discussion_r484217079
##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
"""
gateway = get_gateway()
return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):
Review comment:
Move this class to table_result?
##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
"""
gateway = get_gateway()
return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):
+ """
Review comment:
This current implementation of this class is actually not closable.
Could you update it a bit as following:
- Add a close() method
- Add __enter__ and __exit__ methods
- Add documentation in TableResult.collect describing that close should be
called at the end. You could refer to the Java classes for more details
- Add an example in TableResult.collect on how to use the returned result
using the `with` statement
##########
File path: flink-python/pyflink/table/utils.py
##########
@@ -76,3 +79,61 @@ def to_expression_jarray(exprs):
"""
gateway = get_gateway()
return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
+
+
+class CloseableIterator(object):
+ """
+ Representing an Iterator that is also auto closeable.
+ """
+ def __init__(self, j_closeable_iterator, field_data_types):
+ self._j_closeable_iterator = j_closeable_iterator
+ self._j_field_data_types = field_data_types
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if not self._j_closeable_iterator.hasNext():
+ raise StopIteration("No more data.")
+ gateway = get_gateway()
+ pickle_bytes = gateway.jvm.PythonBridgeUtils. \
+ getPickledBytesFromRow(self._j_closeable_iterator.next(),
+ self._j_field_data_types)
+ pickle_bytes = list(pickle_bytes)
+ data_types = [_from_java_type(j_field_data_type)
Review comment:
data_types could be generated in the constructor. There is no need to
generate it for each element.
##########
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##########
@@ -445,6 +446,46 @@ def test_to_retract_stream(self):
"(True, <Row(2, 'Hello')>)"]
self.assertEqual(result, expected)
+ def test_collect_for_all_data_types(self):
Review comment:
could you make the tests runs both in blink planner and the legacy
planner?
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
return objs;
}
+ private static List<byte[]> getPickledBytesFromRow(Row row,
LogicalType[] dataTypes) throws IOException {
+ List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+ Pickler pickler = new Pickler();
+ for (int i = 0; i < row.getArity(); i++) {
+ Object fieldData = row.getField(i);
+ if (fieldData == null) {
+ pickledRowBytes.add(new byte[0]);
+ } else {
+ if (dataTypes[i] instanceof DateType) {
+ long time = ((Date)
fieldData).toLocalDate().toEpochDay();
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof TimeType) {
+ long time = ((Time)
fieldData).toLocalTime().toNanoOfDay();
+ time = time / 1000;
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof RowType) {
+ Row tmpRow = (Row) fieldData;
+ LogicalType[] tmpRowFieldTypes = new
LogicalType[tmpRow.getArity()];
+ ((RowType)
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
Review comment:
What about change it as following:
tmpRowFieldTypes = ((RowType) dataTypes[i]).getChildren().toArray(new
LogicalType[0])
##########
File path: flink-python/pyflink/table/tests/test_table_environment_api.py
##########
@@ -445,6 +446,46 @@ def test_to_retract_stream(self):
"(True, <Row(2, 'Hello')>)"]
self.assertEqual(result, expected)
+ def test_collect_for_all_data_types(self):
+
+ def collect_from_source(element_data, expected_output):
+ source = self.t_env.from_elements(element_data, ["a"])
+ result = source.execute().collect()
+ collected_result = []
+ for i in result:
+ collected_result.append(i)
+ self.assertEqual(expected_output, collected_result)
+
+ data_in_different_types = [True, 1, "a", u"a", datetime.date(1970, 1,
1),
+ datetime.time(0, 0, 0), 1.0, [1], (1,),
{"a": 1}, bytearray(1)]
+ expected_results = [Row([True]), Row([1]), Row(['a']), Row(['a']),
+ Row([datetime.date(1970, 1, 1)]),
Row([datetime.time(0, 0)]),
+ Row([1.0]), Row([[1]]), Row([Row([1])]),
Row([{'a': 1}]),
+ Row([bytearray(b'\x00')])]
+ zipped_datas = zip(data_in_different_types, expected_results)
+ for data, expected_data in zipped_datas:
+ element_data = [Row(data) for _ in range(2)]
+ expected_output = [expected_data for _ in range(2)]
+ collect_from_source(element_data, expected_output)
+
+ def test_collect_with_retract(self):
+ element_data = [(1, 2, 'a'),
+ (3, 4, 'b'),
+ (5, 6, 'a'),
+ (7, 8, 'b')]
+
+ source = self.t_env.from_elements(element_data, ["a", "b", "c"])
+ result = self.t_env.execute_sql("SELECT SUM(a), c FROM %s group by c"
% source).collect()
+ collected_result = []
+ for i in result:
+ collected_result.append(i)
+ # The result contains one delete row and an insert row in retract
operation for each key.
Review comment:
It seems that we should consider the retraction flag also, that's the
rowKind field?
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
return objs;
}
+ private static List<byte[]> getPickledBytesFromRow(Row row,
LogicalType[] dataTypes) throws IOException {
Review comment:
Should we call the initialize() method?
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
return objs;
}
+ private static List<byte[]> getPickledBytesFromRow(Row row,
LogicalType[] dataTypes) throws IOException {
+ List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+ Pickler pickler = new Pickler();
+ for (int i = 0; i < row.getArity(); i++) {
+ Object fieldData = row.getField(i);
+ if (fieldData == null) {
Review comment:
Could you add test case for this? That's None field.
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
return objs;
}
+ private static List<byte[]> getPickledBytesFromRow(Row row,
LogicalType[] dataTypes) throws IOException {
+ List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+ Pickler pickler = new Pickler();
+ for (int i = 0; i < row.getArity(); i++) {
+ Object fieldData = row.getField(i);
+ if (fieldData == null) {
+ pickledRowBytes.add(new byte[0]);
+ } else {
+ if (dataTypes[i] instanceof DateType) {
+ long time = ((Date)
fieldData).toLocalDate().toEpochDay();
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof TimeType) {
+ long time = ((Time)
fieldData).toLocalTime().toNanoOfDay();
+ time = time / 1000;
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof RowType) {
+ Row tmpRow = (Row) fieldData;
+ LogicalType[] tmpRowFieldTypes = new
LogicalType[tmpRow.getArity()];
+ ((RowType)
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
+ List<byte[]> rowFieldBytes =
getPickledBytesFromRow(tmpRow, tmpRowFieldTypes);
+
pickledRowBytes.add(pickler.dumps(rowFieldBytes));
+ } else {
+
pickledRowBytes.add(pickler.dumps(row.getField(i)));
Review comment:
It seems that the implementation doesn't consider the composite types
such as ArrayType, MapType, etc, e.g. the element of an array is Row.
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
return objs;
}
+ private static List<byte[]> getPickledBytesFromRow(Row row,
LogicalType[] dataTypes) throws IOException {
+ List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+ Pickler pickler = new Pickler();
+ for (int i = 0; i < row.getArity(); i++) {
+ Object fieldData = row.getField(i);
+ if (fieldData == null) {
+ pickledRowBytes.add(new byte[0]);
+ } else {
+ if (dataTypes[i] instanceof DateType) {
+ long time = ((Date)
fieldData).toLocalDate().toEpochDay();
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof TimeType) {
+ long time = ((Time)
fieldData).toLocalTime().toNanoOfDay();
+ time = time / 1000;
+
pickledRowBytes.add(pickler.dumps(time));
+ } else if (dataTypes[i] instanceof RowType) {
+ Row tmpRow = (Row) fieldData;
+ LogicalType[] tmpRowFieldTypes = new
LogicalType[tmpRow.getArity()];
+ ((RowType)
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
+ List<byte[]> rowFieldBytes =
getPickledBytesFromRow(tmpRow, tmpRowFieldTypes);
+
pickledRowBytes.add(pickler.dumps(rowFieldBytes));
+ } else {
Review comment:
Doesn't all types are supported? Otherwise, it would be great to throw
meaningful exceptions if there are unsupported types.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]