shuiqiangchen commented on a change in pull request #12902:
URL: https://github.com/apache/flink/pull/12902#discussion_r505124750



##########
File path: 
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -162,7 +171,42 @@
                return objs;
        }
 
+       private static List<byte[]> getPickledBytesFromRow(Row row, 
LogicalType[] dataTypes) throws IOException {
+               List<byte[]> pickledRowBytes = new ArrayList<>(row.getArity());
+               Pickler pickler = new Pickler();
+               for (int i = 0; i < row.getArity(); i++) {
+                       Object fieldData = row.getField(i);
+                       if (fieldData == null) {
+                               pickledRowBytes.add(new byte[0]);
+                       } else {
+                               if (dataTypes[i] instanceof DateType) {
+                                       long time = ((Date) 
fieldData).toLocalDate().toEpochDay();
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof TimeType) {
+                                       long time = ((Time) 
fieldData).toLocalTime().toNanoOfDay();
+                                       time = time / 1000;
+                                       
pickledRowBytes.add(pickler.dumps(time));
+                               } else if (dataTypes[i] instanceof RowType) {
+                                       Row tmpRow = (Row) fieldData;
+                                       LogicalType[] tmpRowFieldTypes = new 
LogicalType[tmpRow.getArity()];
+                                       ((RowType) 
dataTypes[i]).getChildren().toArray(tmpRowFieldTypes);
+                                       List<byte[]> rowFieldBytes = 
getPickledBytesFromRow(tmpRow, tmpRowFieldTypes);
+                                       
pickledRowBytes.add(pickler.dumps(rowFieldBytes));
+                               } else {

Review comment:
       Types that can not be directly serialized by pickle will be process as 
code blocks above, and others will be serialized by pickle. So all data types 
are supported here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to